Coverage for tests / test_visit_image.py: 15%

272 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-15 01:54 -0700

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import unittest 

16import warnings 

17from typing import Any 

18 

19import astropy.io.fits 

20import astropy.units as u 

21import astropy.wcs 

22import numpy as np 

23from astro_metadata_translator import ObservationInfo 

24 

25from lsst.images import ( 

26 Box, 

27 DetectorFrame, 

28 Image, 

29 MaskPlane, 

30 MaskSchema, 

31 ObservationSummaryStats, 

32 Polygon, 

33 ProjectionAstropyView, 

34 TractFrame, 

35 VisitImage, 

36 get_legacy_visit_image_mask_planes, 

37) 

38from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy 

39from lsst.images.cameras import Detector 

40from lsst.images.fields import ChebyshevField 

41from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata 

42from lsst.images.json import read as read_json 

43from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction 

44from lsst.images.tests import ( 

45 DP2_VISIT_DETECTOR_DATA_ID, 

46 RoundtripFits, 

47 TemporaryButler, 

48 assert_masked_images_equal, 

49 assert_projections_equal, 

50 compare_aperture_corrections_to_legacy, 

51 compare_detector_to_legacy, 

52 compare_visit_image_to_legacy, 

53 make_random_projection, 

54) 

55 

56EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

57LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data") 

58 

59 

60class VisitImageTestCase(unittest.TestCase): 

61 """Basic Tests for VisitImage.""" 

62 

63 @classmethod 

64 def setUpClass(cls) -> None: 

65 cls.rng = np.random.default_rng(500) 

66 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096]) 

67 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")]) 

68 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4) 

69 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4) 

70 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]) 

71 cls.aperture_corrections: ApertureCorrectionMap = { 

72 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])), 

73 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])), 

74 } 

75 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json")) 

76 

77 opaque = FitsOpaqueMetadata() 

78 hdr = astropy.io.fits.Header() 

79 with warnings.catch_warnings(): 

80 # Silence warnings about long keys becoming HIERARCH. 

81 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

82 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"}) 

83 opaque.extract_legacy_primary_header(hdr) 

84 

85 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy) 

86 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy) 

87 # polygon is the lower triangle of the image. 

88 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5]) 

89 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096]) 

90 # API signature suggests projection and obs_info can be None but they 

91 # are required (unless you pass them in via the image plane). 

92 cls.visit_image = VisitImage( 

93 cls.image, 

94 variance=cls.variance, 

95 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

96 mask_schema=cls.mask_schema, 

97 projection=cls.projection, 

98 obs_info=cls.obs_info, 

99 summary_stats=cls.summary_stats, 

100 detector=cls.detector, 

101 bounds=cls.polygon, 

102 aperture_corrections=cls.aperture_corrections, 

103 ) 

104 cls.visit_image._opaque_metadata = opaque 

105 cls.simplest_visit_image = VisitImage( 

106 cls.image, 

107 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

108 mask_schema=cls.mask_schema, 

109 projection=cls.projection, 

110 detector=cls.detector, 

111 obs_info=cls.obs_info, 

112 ) 

113 

114 def test_basics(self) -> None: 

115 """Test basic constructor patterns.""" 

116 # Test default fill of variance. 

117 visit = self.simplest_visit_image 

118 self.assertEqual(visit.variance.array[0, 0], 1.0) 

119 self.assertIs(visit[...], visit) 

120 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])") 

121 self.assertEqual( 

122 repr(visit), 

123 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024))," 

124 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')]," 

125 " dtype=dtype('uint8')))", 

126 ) 

127 

128 astropy_wcs = visit.astropy_wcs 

129 self.assertIsInstance(astropy_wcs, ProjectionAstropyView) 

130 approx_wcs = visit.fits_wcs 

131 self.assertIsInstance(approx_wcs, astropy.wcs.WCS) 

132 

133 with self.assertRaises(TypeError): 

134 # Requires a PSF. 

135 VisitImage( 

136 self.image, 

137 mask_schema=self.mask_schema, 

138 projection=self.projection, 

139 obs_info=self.obs_info, 

140 detector=self.detector, 

141 ) 

142 

143 with self.assertRaises(TypeError): 

144 # Requires ObservationInfo. 

145 VisitImage( 

146 self.image, 

147 psf=self.gaussian_psf, 

148 mask_schema=self.mask_schema, 

149 projection=self.projection, 

150 detector=self.detector, 

151 ) 

152 

153 with self.assertRaises(TypeError): 

154 # Requires a projection. 

155 VisitImage( 

156 self.image, 

157 psf=self.gaussian_psf, 

158 mask_schema=self.mask_schema, 

159 obs_info=self.obs_info, 

160 detector=self.detector, 

161 ) 

162 

163 with self.assertRaises(TypeError): 

164 # Requires a detector. 

165 VisitImage( 

166 self.image, 

167 psf=self.gaussian_psf, 

168 mask_schema=self.mask_schema, 

169 projection=self.projection, 

170 obs_info=self.obs_info, 

171 ) 

172 

173 with self.assertRaises(TypeError): 

174 # Requires some form of mask. 

175 VisitImage( 

176 self.image, 

177 psf=self.gaussian_psf, 

178 projection=self.projection, 

179 obs_info=self.obs_info, 

180 detector=self.detector, 

181 ) 

182 

183 with self.assertRaises(TypeError): 

184 VisitImage( 

185 Image(42, shape=(5, 5)), 

186 psf=self.gaussian_psf, 

187 mask_schema=self.mask_schema, 

188 projection=self.projection, 

189 obs_info=self.obs_info, 

190 detector=self.detector, 

191 ) 

192 

193 # Requires a DetectorFrame. 

194 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10]) 

195 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096]) 

196 with self.assertRaises(TypeError): 

197 VisitImage( 

198 self.image, 

199 projection=tract_proj, 

200 psf=self.gaussian_psf, 

201 mask_schema=self.mask_schema, 

202 obs_info=self.obs_info, 

203 detector=self.detector, 

204 ) 

205 

206 # Variance unit mismatch. 

207 with self.assertRaises(ValueError): 

208 VisitImage( 

209 self.image, 

210 variance=self.image, 

211 psf=self.gaussian_psf, 

212 mask_schema=self.mask_schema, 

213 projection=self.projection, 

214 obs_info=self.obs_info, 

215 detector=self.detector, 

216 ) 

217 

218 def test_copy_and_slice(self) -> None: 

219 """Test that arrays and components are copied (when not immutable) by 

220 'copy' and referenced by 'slice'. 

221 """ 

222 visit = self.visit_image 

223 copy = visit.copy() 

224 copy.image.array[0, 0] = 30.0 

225 self.assertEqual(visit.image.array[0, 0], 42.0) 

226 self.assertEqual(copy.image.array[0, 0], 30.0) 

227 subvisit = visit[Box.factory[0:5, 0:5]] 

228 # Check summary stats. 

229 self.assertEqual(copy.summary_stats, visit.summary_stats) 

230 self.assertIsNot(copy.summary_stats, visit.summary_stats) 

231 self.assertEqual(subvisit.summary_stats, visit.summary_stats) 

232 self.assertIs(subvisit.summary_stats, visit.summary_stats) 

233 # Check aperture corrections. 

234 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

235 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections) 

236 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys()) 

237 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections) 

238 # Check bounds. 

239 self.assertIs(copy.bounds, self.polygon) 

240 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox 

241 

242 def test_obs_info(self) -> None: 

243 """Check that ObservationInfo has been constructed.""" 

244 visit = self.visit_image 

245 self.assertIsNotNone(visit.obs_info) 

246 self.maxDiff = None 

247 assert visit.obs_info is not None # for mypy. 

248 self.assertEqual(visit.obs_info.instrument, "LSSTCam") 

249 

250 def test_summary_stats(self) -> None: 

251 """Test the comparisons and attributes of ObservationSummaryStats.""" 

252 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)) 

253 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5)) 

254 self.assertNotEqual( 

255 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2)) 

256 ) 

257 

258 def test_read_write(self) -> None: 

259 """Test that a visit can round trip through a FITS file.""" 

260 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

261 # Check that we're still using the right compression, and that we 

262 # wrote WCSs. 

263 fits = roundtrip.inspect() 

264 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

265 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN") 

266 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

267 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN") 

268 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

269 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN") 

270 # Check a subimage read. 

271 subbox = Box.factory[8:13, 9:30] 

272 subimage = roundtrip.get(bbox=subbox) 

273 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

274 with self.subTest(): 

275 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

276 with self.subTest(): 

277 obs_info = roundtrip.get("obs_info") 

278 self.assertIsInstance(obs_info, ObservationInfo) 

279 self.assertEqual(obs_info, self.visit_image.obs_info) 

280 with self.subTest(): 

281 summary_stats = roundtrip.get("summary_stats") 

282 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

283 self.assertEqual(summary_stats, self.visit_image.summary_stats) 

284 with self.subTest(): 

285 psf = roundtrip.get("psf") 

286 self.assertIsInstance(psf, GaussianPointSpreadFunction) 

287 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox) 

288 

289 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

290 # Check that the round-tripped headers are the same (up to card order). 

291 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1) 

292 self.assertEqual( 

293 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

294 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

295 ) 

296 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

297 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

298 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

299 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info) 

300 self.assertIsNotNone(roundtrip.result.summary_stats) 

301 self.assertEqual( 

302 roundtrip.result.summary_stats.psfSigma, 

303 self.visit_image.summary_stats.psfSigma, 

304 ) 

305 self.assertEqual( 

306 roundtrip.result.summary_stats.zeroPoint, 

307 self.visit_image.summary_stats.zeroPoint, 

308 ) 

309 self.assertEqual(roundtrip.result.bounds, self.polygon) 

310 

311 

312@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

313class VisitImageLegacyTestCase(unittest.TestCase): 

314 """Tests for the VisitImage class and the basics of the archive system. 

315 

316 Requires legacy code. 

317 """ 

318 

319 @classmethod 

320 def setUpClass(cls) -> None: 

321 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator." 

322 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits") 

323 try: 

324 from lsst.afw.image import ExposureFitsReader 

325 

326 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

327 except ImportError: 

328 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None 

329 cls.plane_map = plane_map = get_legacy_visit_image_mask_planes() 

330 cls.visit_image = VisitImage.read_legacy( 

331 cls.filename, preserve_quantization=True, plane_map=plane_map 

332 ) 

333 

334 def test_legacy_errors(self) -> None: 

335 """Legacy read failure modes.""" 

336 with self.assertRaises(ValueError): 

337 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC") 

338 with self.assertRaises(ValueError): 

339 VisitImage.from_legacy(self.legacy_exposure, visit=123456) 

340 with self.assertRaises(ValueError): 

341 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy) 

342 visit = VisitImage.from_legacy( 

343 self.legacy_exposure, instrument="LSSTCam", unit=u.nJy, visit=2025052000177 

344 ) 

345 self.assertEqual(visit.unit, u.nJy) 

346 

347 with self.assertRaises(ValueError): 

348 VisitImage.read_legacy(self.filename, instrument="HSC") 

349 with self.assertRaises(ValueError): 

350 VisitImage.read_legacy(self.filename, visit=123456) 

351 

352 def test_component_reads(self) -> None: 

353 """Test reads of components from legacy file.""" 

354 visit = VisitImage.read_legacy(self.filename) 

355 proj = VisitImage.read_legacy(self.filename, component="projection") 

356 assert_projections_equal(self, proj, visit.projection, expect_identity=False) 

357 image = VisitImage.read_legacy(self.filename, component="image") 

358 self.assertEqual(image, visit.image) 

359 self.check_legacy_obs_info(image.obs_info) 

360 assert_projections_equal(self, proj, image.projection, expect_identity=False) 

361 variance = VisitImage.read_legacy(self.filename, component="variance") 

362 self.assertEqual(variance, visit.variance) 

363 assert_projections_equal(self, proj, variance.projection, expect_identity=False) 

364 self.check_legacy_obs_info(variance.obs_info) 

365 mask = VisitImage.read_legacy(self.filename, component="mask") 

366 self.assertEqual(mask, visit.mask) 

367 assert_projections_equal(self, proj, mask.projection, expect_identity=False) 

368 self.check_legacy_obs_info(mask.obs_info) 

369 psf = VisitImage.read_legacy(self.filename, component="psf") 

370 self.assertIsInstance(psf, PointSpreadFunction) 

371 obs_info = VisitImage.read_legacy(self.filename, component="obs_info") 

372 self.check_legacy_obs_info(obs_info) 

373 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats") 

374 self.assertIsInstance(summary_stats, ObservationSummaryStats) 

375 self.assertEqual(summary_stats.nPsfStar, 93) 

376 compare_aperture_corrections_to_legacy( 

377 self, 

378 VisitImage.read_legacy(self.filename, component="aperture_corrections"), 

379 self.legacy_exposure.info.getApCorrMap(), 

380 visit.bbox, 

381 ) 

382 detector = VisitImage.read_legacy(self.filename, component="detector") 

383 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True) 

384 

385 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None: 

386 """Check that an `ObservationInfo` instance is not `None`, and that it 

387 matches the one in the legacy test data file. 

388 """ 

389 self.assertIsInstance(obs_info, ObservationInfo) 

390 self.assertEqual(obs_info.instrument, "LSSTCam") 

391 self.assertEqual(obs_info.detector_num, 85, obs_info) 

392 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info) 

393 self.assertEqual(obs_info.physical_filter, "r_57", obs_info) 

394 

395 def test_obs_info(self) -> None: 

396 """Check that ObservationInfo has been constructed.""" 

397 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

398 self.assertIsNotNone(legacy.obs_info) 

399 self.maxDiff = None 

400 self.assertEqual(legacy.obs_info, self.visit_image.obs_info) 

401 assert legacy.obs_info is not None # for mypy. 

402 self.assertEqual(legacy.obs_info.instrument, "LSSTCam") 

403 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info) 

404 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info) 

405 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info) 

406 

407 def test_aperture_corrections_to_legacy(self) -> None: 

408 """Test that we can convert an aperture correction map back to a 

409 legacy `lsst.afw.image.ApCorrMap`. 

410 """ 

411 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections) 

412 compare_aperture_corrections_to_legacy( 

413 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox 

414 ) 

415 

416 def test_read_legacy_headers(self) -> None: 

417 """Test that headers were correctly stripped and interpreted in 

418 `VisitImage.read_legacy`. 

419 """ 

420 # Check that we read the units from BUNIT. 

421 self.assertEqual(self.visit_image.unit, astropy.units.nJy) 

422 # Check that the primary header has the keys we want, and none of the 

423 # keys we don't want. 

424 header = self.visit_image._opaque_metadata.headers[ExtensionKey()] 

425 self.assertIn("EXPTIME", header) 

426 self.assertEqual(header["PLATFORM"], "lsstcam") 

427 self.assertNotIn("LSST BUTLER ID", header) 

428 self.assertNotIn("AR HDU", header) 

429 self.assertNotIn("A_ORDER", header) 

430 # Check that the extension HDUs do not have any custom headers. 

431 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

432 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

433 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

434 

435 def test_from_legacy_headers(self) -> None: 

436 """Test that from_legacy handles headers properly.""" 

437 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

438 header = legacy._opaque_metadata.headers[ExtensionKey()] 

439 self.assertIn("EXPTIME", header) 

440 self.assertEqual(header["PLATFORM"], "lsstcam") 

441 self.assertNotIn("LSST BUTLER ID", header) 

442 self.assertNotIn("AR HDU", header) 

443 self.assertNotIn("A_ORDER", header) 

444 # Check that the extension HDUs do not have any custom headers. 

445 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

446 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

447 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

448 

449 def test_rewrite(self) -> None: 

450 """Test that we can rewrite the visit image and preserve both 

451 lossy-compressed pixel values and components exactly. 

452 """ 

453 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

454 # Check that we're still using the right compression, and that we 

455 # wrote WCSs. 

456 fits = roundtrip.inspect() 

457 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

458 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP") 

459 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

460 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP") 

461 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

462 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP") 

463 # Check a subimage read. 

464 subbox = Box.factory[8:13, 9:30] 

465 subimage = roundtrip.get(bbox=subbox) 

466 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

467 alternates: dict[str, Any] = {} 

468 with self.subTest(): 

469 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

470 alternates = { 

471 k: roundtrip.get(k) 

472 for k in [ 

473 "projection", 

474 "image", 

475 "mask", 

476 "variance", 

477 "psf", 

478 "obs_info", 

479 "summary_stats", 

480 "aperture_corrections", 

481 "detector", 

482 ] 

483 } 

484 # Try to do a butler get of a component with storage class 

485 # override. 

486 with self.subTest(): 

487 if self.legacy_exposure is not None: 

488 import lsst.afw.image 

489 

490 # We have VisitInfo available. 

491 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo") 

492 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo) 

493 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam") 

494 else: 

495 raise unittest.SkipTest("Can not test VisitInfo conversion without afw") 

496 

497 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

498 # Check that the round-tripped headers are the same (up to card order). 

499 self.assertEqual( 

500 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

501 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

502 ) 

503 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

504 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

505 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

506 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam") 

507 compare_visit_image_to_legacy( 

508 self, 

509 roundtrip.result, 

510 self.legacy_exposure, 

511 expect_view=False, 

512 plane_map=self.plane_map, 

513 **DP2_VISIT_DETECTOR_DATA_ID, 

514 alternates=alternates, 

515 ) 

516 # Check converting from the legacy object in-memory. 

517 compare_visit_image_to_legacy( 

518 self, 

519 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map), 

520 self.legacy_exposure, 

521 expect_view=True, 

522 plane_map=self.plane_map, 

523 **DP2_VISIT_DETECTOR_DATA_ID, 

524 ) 

525 

526 def test_butler_converters(self) -> None: 

527 """Test that we can read a VisitImage and its components from a butler 

528 dataset written as an `lsst.afw.image.Exposure`. 

529 """ 

530 if self.legacy_exposure is None: 

531 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.") 

532 with TemporaryButler(legacy="ExposureF") as helper: 

533 from lsst.daf.butler import FileDataset 

534 

535 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink") 

536 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage") 

537 visit_image = helper.butler.get(visit_image_ref) 

538 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox")) 

539 self.assertEqual(bbox, visit_image.bbox) 

540 alternates = { 

541 k: helper.butler.get(visit_image_ref.makeComponentRef(k)) 

542 # TODO: including "projection" or "obs_info" here fails because 

543 # there's code in daf_butler that expects any component to be 

544 # valid for the *internal* storage class, not the requested 

545 # one, and that's difficult to fix because it's tied up with 

546 # the data ID standardization logic. 

547 for k in ["image", "mask", "variance", "psf", "detector"] 

548 } 

549 compare_visit_image_to_legacy( 

550 self, 

551 visit_image, 

552 self.legacy_exposure, 

553 expect_view=False, 

554 plane_map=self.plane_map, 

555 alternates=alternates, 

556 **DP2_VISIT_DETECTOR_DATA_ID, 

557 ) 

558 

559 

560if __name__ == "__main__": 

561 unittest.main()