Coverage for tests / test_visit_image.py: 16%

289 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-16 07:54 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import unittest 

16import warnings 

17from typing import Any 

18 

19import astropy.io.fits 

20import astropy.units as u 

21import astropy.wcs 

22import numpy as np 

23from astro_metadata_translator import ObservationInfo 

24 

25from lsst.images import ( 

26 Box, 

27 DetectorFrame, 

28 Image, 

29 MaskPlane, 

30 MaskSchema, 

31 ObservationSummaryStats, 

32 Polygon, 

33 ProjectionAstropyView, 

34 TractFrame, 

35 VisitImage, 

36 get_legacy_visit_image_mask_planes, 

37) 

38from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy 

39from lsst.images.cameras import Detector 

40from lsst.images.fields import ChebyshevField 

41from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata 

42from lsst.images.json import read as read_json 

43from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction 

44from lsst.images.tests import ( 

45 DP2_VISIT_DETECTOR_DATA_ID, 

46 RoundtripFits, 

47 RoundtripNdf, 

48 TemporaryButler, 

49 assert_masked_images_equal, 

50 assert_projections_equal, 

51 compare_aperture_corrections_to_legacy, 

52 compare_detector_to_legacy, 

53 compare_visit_image_to_legacy, 

54 make_random_projection, 

55) 

56 

57try: 

58 import h5py # noqa: F401 

59 

60 HAVE_H5PY = True 

61except ImportError: 

62 HAVE_H5PY = False 

63 

64EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

65LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data") 

66 

67 

68class VisitImageTestCase(unittest.TestCase): 

69 """Basic Tests for VisitImage.""" 

70 

71 @classmethod 

72 def setUpClass(cls) -> None: 

73 cls.rng = np.random.default_rng(500) 

74 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096]) 

75 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")]) 

76 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4) 

77 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4) 

78 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]) 

79 cls.aperture_corrections: ApertureCorrectionMap = { 

80 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])), 

81 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])), 

82 } 

83 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json")) 

84 

85 opaque = FitsOpaqueMetadata() 

86 hdr = astropy.io.fits.Header() 

87 with warnings.catch_warnings(): 

88 # Silence warnings about long keys becoming HIERARCH. 

89 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

90 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"}) 

91 opaque.extract_legacy_primary_header(hdr) 

92 

93 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy) 

94 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy) 

95 # polygon is the lower triangle of the image. 

96 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5]) 

97 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096]) 

98 # API signature suggests projection and obs_info can be None but they 

99 # are required (unless you pass them in via the image plane). 

100 cls.visit_image = VisitImage( 

101 cls.image, 

102 variance=cls.variance, 

103 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

104 mask_schema=cls.mask_schema, 

105 projection=cls.projection, 

106 obs_info=cls.obs_info, 

107 summary_stats=cls.summary_stats, 

108 detector=cls.detector, 

109 bounds=cls.polygon, 

110 aperture_corrections=cls.aperture_corrections, 

111 ) 

112 cls.visit_image._opaque_metadata = opaque 

113 cls.simplest_visit_image = VisitImage( 

114 cls.image, 

115 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

116 mask_schema=cls.mask_schema, 

117 projection=cls.projection, 

118 detector=cls.detector, 

119 obs_info=cls.obs_info, 

120 ) 

121 

122 def test_basics(self) -> None: 

123 """Test basic constructor patterns.""" 

124 # Test default fill of variance. 

125 visit = self.simplest_visit_image 

126 self.assertEqual(visit.variance.array[0, 0], 1.0) 

127 self.assertIs(visit[...], visit) 

128 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])") 

129 self.assertEqual( 

130 repr(visit), 

131 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024))," 

132 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')]," 

133 " dtype=dtype('uint8')))", 

134 ) 

135 

136 astropy_wcs = visit.astropy_wcs 

137 self.assertIsInstance(astropy_wcs, ProjectionAstropyView) 

138 approx_wcs = visit.fits_wcs 

139 self.assertIsInstance(approx_wcs, astropy.wcs.WCS) 

140 

141 with self.assertRaises(TypeError): 

142 # Requires a PSF. 

143 VisitImage( 

144 self.image, 

145 mask_schema=self.mask_schema, 

146 projection=self.projection, 

147 obs_info=self.obs_info, 

148 detector=self.detector, 

149 ) 

150 

151 with self.assertRaises(TypeError): 

152 # Requires ObservationInfo. 

153 VisitImage( 

154 self.image, 

155 psf=self.gaussian_psf, 

156 mask_schema=self.mask_schema, 

157 projection=self.projection, 

158 detector=self.detector, 

159 ) 

160 

161 with self.assertRaises(TypeError): 

162 # Requires a projection. 

163 VisitImage( 

164 self.image, 

165 psf=self.gaussian_psf, 

166 mask_schema=self.mask_schema, 

167 obs_info=self.obs_info, 

168 detector=self.detector, 

169 ) 

170 

171 with self.assertRaises(TypeError): 

172 # Requires a detector. 

173 VisitImage( 

174 self.image, 

175 psf=self.gaussian_psf, 

176 mask_schema=self.mask_schema, 

177 projection=self.projection, 

178 obs_info=self.obs_info, 

179 ) 

180 

181 with self.assertRaises(TypeError): 

182 # Requires some form of mask. 

183 VisitImage( 

184 self.image, 

185 psf=self.gaussian_psf, 

186 projection=self.projection, 

187 obs_info=self.obs_info, 

188 detector=self.detector, 

189 ) 

190 

191 with self.assertRaises(TypeError): 

192 VisitImage( 

193 Image(42, shape=(5, 5)), 

194 psf=self.gaussian_psf, 

195 mask_schema=self.mask_schema, 

196 projection=self.projection, 

197 obs_info=self.obs_info, 

198 detector=self.detector, 

199 ) 

200 

201 # Requires a DetectorFrame. 

202 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10]) 

203 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096]) 

204 with self.assertRaises(TypeError): 

205 VisitImage( 

206 self.image, 

207 projection=tract_proj, 

208 psf=self.gaussian_psf, 

209 mask_schema=self.mask_schema, 

210 obs_info=self.obs_info, 

211 detector=self.detector, 

212 ) 

213 

214 # Variance unit mismatch. 

215 with self.assertRaises(ValueError): 

216 VisitImage( 

217 self.image, 

218 variance=self.image, 

219 psf=self.gaussian_psf, 

220 mask_schema=self.mask_schema, 

221 projection=self.projection, 

222 obs_info=self.obs_info, 

223 detector=self.detector, 

224 ) 

225 

226 def test_copy_and_slice(self) -> None: 

227 """Test that arrays and components are copied (when not immutable) by 

228 'copy' and referenced by 'slice'. 

229 """ 

230 visit = self.visit_image 

231 copy = visit.copy() 

232 copy.image.array[0, 0] = 30.0 

233 self.assertEqual(visit.image.array[0, 0], 42.0) 

234 self.assertEqual(copy.image.array[0, 0], 30.0) 

235 subvisit = visit[Box.factory[0:5, 0:5]] 

236 # Check summary stats. 

237 self.assertEqual(copy.summary_stats, visit.summary_stats) 

238 self.assertIsNot(copy.summary_stats, visit.summary_stats) 

239 self.assertEqual(subvisit.summary_stats, visit.summary_stats) 

240 self.assertIs(subvisit.summary_stats, visit.summary_stats) 

241 # Check aperture corrections. 

242 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

243 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections) 

244 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

245 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections) 

246 # Check bounds. 

247 self.assertIs(copy.bounds, self.polygon) 

248 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox 

249 

250 def test_obs_info(self) -> None: 

251 """Check that ObservationInfo has been constructed.""" 

252 visit = self.visit_image 

253 self.assertIsNotNone(visit.obs_info) 

254 self.maxDiff = None 

255 assert visit.obs_info is not None # for mypy. 

256 self.assertEqual(visit.obs_info.instrument, "LSSTCam") 

257 

258 def test_summary_stats(self) -> None: 

259 """Test the comparisons and attributes of ObservationSummaryStats.""" 

260 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)) 

261 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5)) 

262 self.assertNotEqual( 

263 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2)) 

264 ) 

265 

266 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

267 def test_round_trip_ndf(self): 

268 """NDF round-trip for VisitImage.""" 

269 with RoundtripNdf(self, self.visit_image) as roundtrip: 

270 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

271 self.assertEqual(roundtrip.result.summary_stats, self.visit_image.summary_stats) 

272 self.assertEqual(type(roundtrip.result.psf), type(self.visit_image.psf)) 

273 

274 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed") 

275 def test_fits_ndf_consistency(self): 

276 """FITS and NDF backends produce equal VisitImages on round-trip.""" 

277 with RoundtripFits(self, self.visit_image) as fits_rt, RoundtripNdf(self, self.visit_image) as ndf_rt: 

278 assert_masked_images_equal(self, self.visit_image, fits_rt.result, expect_view=False) 

279 assert_masked_images_equal(self, self.visit_image, ndf_rt.result, expect_view=False) 

280 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False) 

281 

282 def test_read_write(self) -> None: 

283 """Test that a visit can round trip through a FITS file.""" 

284 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

285 # Check that we're still using the right compression, and that we 

286 # wrote WCSs. 

287 fits = roundtrip.inspect() 

288 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

289 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN") 

290 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

291 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN") 

292 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

293 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN") 

294 # Check a subimage read. 

295 subbox = Box.factory[8:13, 9:30] 

296 subimage = roundtrip.get(bbox=subbox) 

297 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

298 with self.subTest(): 

299 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

300 with self.subTest(): 

301 obs_info = roundtrip.get("obs_info") 

302 self.assertIsInstance(obs_info, ObservationInfo) 

303 self.assertEqual(obs_info, self.visit_image.obs_info) 

304 with self.subTest(): 

305 summary_stats = roundtrip.get("summary_stats") 

306 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

307 self.assertEqual(summary_stats, self.visit_image.summary_stats) 

308 with self.subTest(): 

309 psf = roundtrip.get("psf") 

310 self.assertIsInstance(psf, GaussianPointSpreadFunction) 

311 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox) 

312 

313 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

314 # Check that the round-tripped headers are the same (up to card order). 

315 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1) 

316 self.assertEqual( 

317 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

318 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

319 ) 

320 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

321 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

322 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

323 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info) 

324 self.assertIsNotNone(roundtrip.result.summary_stats) 

325 self.assertEqual( 

326 roundtrip.result.summary_stats.psfSigma, 

327 self.visit_image.summary_stats.psfSigma, 

328 ) 

329 self.assertEqual( 

330 roundtrip.result.summary_stats.zeroPoint, 

331 self.visit_image.summary_stats.zeroPoint, 

332 ) 

333 self.assertEqual(roundtrip.result.bounds, self.polygon) 

334 

335 

336@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

337class VisitImageLegacyTestCase(unittest.TestCase): 

338 """Tests for the VisitImage class and the basics of the archive system. 

339 

340 Requires legacy code. 

341 """ 

342 

343 @classmethod 

344 def setUpClass(cls) -> None: 

345 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

346 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits") 

347 try: 

348 from lsst.afw.image import ExposureFitsReader 

349 

350 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

351 except ImportError: 

352 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None 

353 cls.plane_map = plane_map = get_legacy_visit_image_mask_planes() 

354 cls.visit_image = VisitImage.read_legacy( 

355 cls.filename, preserve_quantization=True, plane_map=plane_map 

356 ) 

357 

358 def test_legacy_errors(self) -> None: 

359 """Legacy read failure modes.""" 

360 with self.assertRaises(ValueError): 

361 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC") 

362 with self.assertRaises(ValueError): 

363 VisitImage.from_legacy(self.legacy_exposure, visit=123456) 

364 with self.assertRaises(ValueError): 

365 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy) 

366 visit = VisitImage.from_legacy( 

367 self.legacy_exposure, instrument="LSSTCam", unit=u.nJy, visit=2025052000177 

368 ) 

369 self.assertEqual(visit.unit, u.nJy) 

370 

371 with self.assertRaises(ValueError): 

372 VisitImage.read_legacy(self.filename, instrument="HSC") 

373 with self.assertRaises(ValueError): 

374 VisitImage.read_legacy(self.filename, visit=123456) 

375 

376 def test_component_reads(self) -> None: 

377 """Test reads of components from legacy file.""" 

378 visit = VisitImage.read_legacy(self.filename) 

379 proj = VisitImage.read_legacy(self.filename, component="projection") 

380 assert_projections_equal(self, proj, visit.projection, expect_identity=False) 

381 image = VisitImage.read_legacy(self.filename, component="image") 

382 self.assertEqual(image, visit.image) 

383 self.check_legacy_obs_info(image.obs_info) 

384 assert_projections_equal(self, proj, image.projection, expect_identity=False) 

385 variance = VisitImage.read_legacy(self.filename, component="variance") 

386 self.assertEqual(variance, visit.variance) 

387 assert_projections_equal(self, proj, variance.projection, expect_identity=False) 

388 self.check_legacy_obs_info(variance.obs_info) 

389 mask = VisitImage.read_legacy(self.filename, component="mask") 

390 self.assertEqual(mask, visit.mask) 

391 assert_projections_equal(self, proj, mask.projection, expect_identity=False) 

392 self.check_legacy_obs_info(mask.obs_info) 

393 psf = VisitImage.read_legacy(self.filename, component="psf") 

394 self.assertIsInstance(psf, PointSpreadFunction) 

395 obs_info = VisitImage.read_legacy(self.filename, component="obs_info") 

396 self.check_legacy_obs_info(obs_info) 

397 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats") 

398 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

399 self.assertEqual(summary_stats.nPsfStar, 93) 

400 compare_aperture_corrections_to_legacy( 

401 self, 

402 VisitImage.read_legacy(self.filename, component="aperture_corrections"), 

403 self.legacy_exposure.info.getApCorrMap(), 

404 visit.bbox, 

405 ) 

406 detector = VisitImage.read_legacy(self.filename, component="detector") 

407 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True) 

408 

409 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None: 

410 """Check that an `ObservationInfo` instance is not `None`, and that it 

411 matches the one in the legacy test data file. 

412 """ 

413 self.assertIsInstance(obs_info, ObservationInfo) 

414 self.assertEqual(obs_info.instrument, "LSSTCam") 

415 self.assertEqual(obs_info.detector_num, 85, obs_info) 

416 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info) 

417 self.assertEqual(obs_info.physical_filter, "r_57", obs_info) 

418 

419 def test_obs_info(self) -> None: 

420 """Check that ObservationInfo has been constructed.""" 

421 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

422 self.assertIsNotNone(legacy.obs_info) 

423 self.maxDiff = None 

424 self.assertEqual(legacy.obs_info, self.visit_image.obs_info) 

425 assert legacy.obs_info is not None # for mypy. 

426 self.assertEqual(legacy.obs_info.instrument, "LSSTCam") 

427 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info) 

428 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info) 

429 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info) 

430 

431 def test_aperture_corrections_to_legacy(self) -> None: 

432 """Test that we can convert an aperture correction map back to a 

433 legacy `lsst.afw.image.ApCorrMap`. 

434 """ 

435 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections) 

436 compare_aperture_corrections_to_legacy( 

437 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox 

438 ) 

439 

440 def test_read_legacy_headers(self) -> None: 

441 """Test that headers were correctly stripped and interpreted in 

442 `VisitImage.read_legacy`. 

443 """ 

444 # Check that we read the units from BUNIT. 

445 self.assertEqual(self.visit_image.unit, astropy.units.nJy) 

446 # Check that the primary header has the keys we want, and none of the 

447 # keys we don't want. 

448 header = self.visit_image._opaque_metadata.headers[ExtensionKey()] 

449 self.assertIn("EXPTIME", header) 

450 self.assertEqual(header["PLATFORM"], "lsstcam") 

451 self.assertNotIn("LSST BUTLER ID", header) 

452 self.assertNotIn("AR HDU", header) 

453 self.assertNotIn("A_ORDER", header) 

454 # Check that the extension HDUs do not have any custom headers. 

455 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

456 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

457 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

458 

459 def test_from_legacy_headers(self) -> None: 

460 """Test that from_legacy handles headers properly.""" 

461 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

462 header = legacy._opaque_metadata.headers[ExtensionKey()] 

463 self.assertIn("EXPTIME", header) 

464 self.assertEqual(header["PLATFORM"], "lsstcam") 

465 self.assertNotIn("LSST BUTLER ID", header) 

466 self.assertNotIn("AR HDU", header) 

467 self.assertNotIn("A_ORDER", header) 

468 # Check that the extension HDUs do not have any custom headers. 

469 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

470 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

471 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

472 

473 def test_rewrite(self) -> None: 

474 """Test that we can rewrite the visit image and preserve both 

475 lossy-compressed pixel values and components exactly. 

476 """ 

477 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

478 # Check that we're still using the right compression, and that we 

479 # wrote WCSs. 

480 fits = roundtrip.inspect() 

481 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

482 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP") 

483 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

484 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP") 

485 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

486 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP") 

487 # Check a subimage read. 

488 subbox = Box.factory[8:13, 9:30] 

489 subimage = roundtrip.get(bbox=subbox) 

490 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

491 alternates: dict[str, Any] = {} 

492 with self.subTest(): 

493 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

494 alternates = { 

495 k: roundtrip.get(k) 

496 for k in [ 

497 "projection", 

498 "image", 

499 "mask", 

500 "variance", 

501 "psf", 

502 "obs_info", 

503 "summary_stats", 

504 "aperture_corrections", 

505 "detector", 

506 ] 

507 } 

508 # Try to do a butler get of a component with storage class 

509 # override. 

510 with self.subTest(): 

511 if self.legacy_exposure is not None: 

512 import lsst.afw.image 

513 

514 # We have VisitInfo available. 

515 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo") 

516 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo) 

517 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam") 

518 else: 

519 raise unittest.SkipTest("Can not test VisitInfo conversion without afw") 

520 

521 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

522 # Check that the round-tripped headers are the same (up to card order). 

523 self.assertEqual( 

524 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

525 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

526 ) 

527 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

528 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

529 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

530 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam") 

531 compare_visit_image_to_legacy( 

532 self, 

533 roundtrip.result, 

534 self.legacy_exposure, 

535 expect_view=False, 

536 plane_map=self.plane_map, 

537 **DP2_VISIT_DETECTOR_DATA_ID, 

538 alternates=alternates, 

539 ) 

540 # Check converting from the legacy object in-memory. 

541 compare_visit_image_to_legacy( 

542 self, 

543 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map), 

544 self.legacy_exposure, 

545 expect_view=True, 

546 plane_map=self.plane_map, 

547 **DP2_VISIT_DETECTOR_DATA_ID, 

548 ) 

549 

550 def test_butler_converters(self) -> None: 

551 """Test that we can read a VisitImage and its components from a butler 

552 dataset written as an `lsst.afw.image.Exposure`. 

553 """ 

554 if self.legacy_exposure is None: 

555 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.") 

556 with TemporaryButler(legacy="ExposureF") as helper: 

557 from lsst.daf.butler import FileDataset 

558 

559 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink") 

560 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage") 

561 visit_image = helper.butler.get(visit_image_ref) 

562 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox")) 

563 self.assertEqual(bbox, visit_image.bbox) 

564 alternates = { 

565 k: helper.butler.get(visit_image_ref.makeComponentRef(k)) 

566 # TODO: including "projection" or "obs_info" here fails because 

567 # there's code in daf_butler that expects any component to be 

568 # valid for the *internal* storage class, not the requested 

569 # one, and that's difficult to fix because it's tied up with 

570 # the data ID standardization logic. 

571 for k in ["image", "mask", "variance", "psf", "detector"] 

572 } 

573 compare_visit_image_to_legacy( 

574 self, 

575 visit_image, 

576 self.legacy_exposure, 

577 expect_view=False, 

578 plane_map=self.plane_map, 

579 alternates=alternates, 

580 **DP2_VISIT_DETECTOR_DATA_ID, 

581 ) 

582 

583 

584if __name__ == "__main__": 

585 unittest.main()