Coverage for tests / test_visit_image.py: 16%
289 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 00:52 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-16 00:52 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import unittest
16import warnings
17from typing import Any
19import astropy.io.fits
20import astropy.units as u
21import astropy.wcs
22import numpy as np
23from astro_metadata_translator import ObservationInfo
25from lsst.images import (
26 Box,
27 DetectorFrame,
28 Image,
29 MaskPlane,
30 MaskSchema,
31 ObservationSummaryStats,
32 Polygon,
33 ProjectionAstropyView,
34 TractFrame,
35 VisitImage,
36 get_legacy_visit_image_mask_planes,
37)
38from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy
39from lsst.images.cameras import Detector
40from lsst.images.fields import ChebyshevField
41from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata
42from lsst.images.json import read as read_json
43from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction
44from lsst.images.tests import (
45 DP2_VISIT_DETECTOR_DATA_ID,
46 RoundtripFits,
47 RoundtripNdf,
48 TemporaryButler,
49 assert_masked_images_equal,
50 assert_projections_equal,
51 compare_aperture_corrections_to_legacy,
52 compare_detector_to_legacy,
53 compare_visit_image_to_legacy,
54 make_random_projection,
55)
57try:
58 import h5py # noqa: F401
60 HAVE_H5PY = True
61except ImportError:
62 HAVE_H5PY = False
64EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
65LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
68class VisitImageTestCase(unittest.TestCase):
69 """Basic Tests for VisitImage."""
71 @classmethod
72 def setUpClass(cls) -> None:
73 cls.rng = np.random.default_rng(500)
74 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096])
75 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")])
76 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4)
77 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)
78 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13])
79 cls.aperture_corrections: ApertureCorrectionMap = {
80 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])),
81 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])),
82 }
83 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json"))
85 opaque = FitsOpaqueMetadata()
86 hdr = astropy.io.fits.Header()
87 with warnings.catch_warnings():
88 # Silence warnings about long keys becoming HIERARCH.
89 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
90 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"})
91 opaque.extract_legacy_primary_header(hdr)
93 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy)
94 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy)
95 # polygon is the lower triangle of the image.
96 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5])
97 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096])
98 # API signature suggests projection and obs_info can be None but they
99 # are required (unless you pass them in via the image plane).
100 cls.visit_image = VisitImage(
101 cls.image,
102 variance=cls.variance,
103 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
104 mask_schema=cls.mask_schema,
105 projection=cls.projection,
106 obs_info=cls.obs_info,
107 summary_stats=cls.summary_stats,
108 detector=cls.detector,
109 bounds=cls.polygon,
110 aperture_corrections=cls.aperture_corrections,
111 )
112 cls.visit_image._opaque_metadata = opaque
113 cls.simplest_visit_image = VisitImage(
114 cls.image,
115 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
116 mask_schema=cls.mask_schema,
117 projection=cls.projection,
118 detector=cls.detector,
119 obs_info=cls.obs_info,
120 )
122 def test_basics(self) -> None:
123 """Test basic constructor patterns."""
124 # Test default fill of variance.
125 visit = self.simplest_visit_image
126 self.assertEqual(visit.variance.array[0, 0], 1.0)
127 self.assertIs(visit[...], visit)
128 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])")
129 self.assertEqual(
130 repr(visit),
131 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024)),"
132 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')],"
133 " dtype=dtype('uint8')))",
134 )
136 astropy_wcs = visit.astropy_wcs
137 self.assertIsInstance(astropy_wcs, ProjectionAstropyView)
138 approx_wcs = visit.fits_wcs
139 self.assertIsInstance(approx_wcs, astropy.wcs.WCS)
141 with self.assertRaises(TypeError):
142 # Requires a PSF.
143 VisitImage(
144 self.image,
145 mask_schema=self.mask_schema,
146 projection=self.projection,
147 obs_info=self.obs_info,
148 detector=self.detector,
149 )
151 with self.assertRaises(TypeError):
152 # Requires ObservationInfo.
153 VisitImage(
154 self.image,
155 psf=self.gaussian_psf,
156 mask_schema=self.mask_schema,
157 projection=self.projection,
158 detector=self.detector,
159 )
161 with self.assertRaises(TypeError):
162 # Requires a projection.
163 VisitImage(
164 self.image,
165 psf=self.gaussian_psf,
166 mask_schema=self.mask_schema,
167 obs_info=self.obs_info,
168 detector=self.detector,
169 )
171 with self.assertRaises(TypeError):
172 # Requires a detector.
173 VisitImage(
174 self.image,
175 psf=self.gaussian_psf,
176 mask_schema=self.mask_schema,
177 projection=self.projection,
178 obs_info=self.obs_info,
179 )
181 with self.assertRaises(TypeError):
182 # Requires some form of mask.
183 VisitImage(
184 self.image,
185 psf=self.gaussian_psf,
186 projection=self.projection,
187 obs_info=self.obs_info,
188 detector=self.detector,
189 )
191 with self.assertRaises(TypeError):
192 VisitImage(
193 Image(42, shape=(5, 5)),
194 psf=self.gaussian_psf,
195 mask_schema=self.mask_schema,
196 projection=self.projection,
197 obs_info=self.obs_info,
198 detector=self.detector,
199 )
201 # Requires a DetectorFrame.
202 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10])
203 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096])
204 with self.assertRaises(TypeError):
205 VisitImage(
206 self.image,
207 projection=tract_proj,
208 psf=self.gaussian_psf,
209 mask_schema=self.mask_schema,
210 obs_info=self.obs_info,
211 detector=self.detector,
212 )
214 # Variance unit mismatch.
215 with self.assertRaises(ValueError):
216 VisitImage(
217 self.image,
218 variance=self.image,
219 psf=self.gaussian_psf,
220 mask_schema=self.mask_schema,
221 projection=self.projection,
222 obs_info=self.obs_info,
223 detector=self.detector,
224 )
226 def test_copy_and_slice(self) -> None:
227 """Test that arrays and components are copied (when not immutable) by
228 'copy' and referenced by 'slice'.
229 """
230 visit = self.visit_image
231 copy = visit.copy()
232 copy.image.array[0, 0] = 30.0
233 self.assertEqual(visit.image.array[0, 0], 42.0)
234 self.assertEqual(copy.image.array[0, 0], 30.0)
235 subvisit = visit[Box.factory[0:5, 0:5]]
236 # Check summary stats.
237 self.assertEqual(copy.summary_stats, visit.summary_stats)
238 self.assertIsNot(copy.summary_stats, visit.summary_stats)
239 self.assertEqual(subvisit.summary_stats, visit.summary_stats)
240 self.assertIs(subvisit.summary_stats, visit.summary_stats)
241 # Check aperture corrections.
242 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys())
243 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections)
244 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys())
245 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections)
246 # Check bounds.
247 self.assertIs(copy.bounds, self.polygon)
248 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox
250 def test_obs_info(self) -> None:
251 """Check that ObservationInfo has been constructed."""
252 visit = self.visit_image
253 self.assertIsNotNone(visit.obs_info)
254 self.maxDiff = None
255 assert visit.obs_info is not None # for mypy.
256 self.assertEqual(visit.obs_info.instrument, "LSSTCam")
258 def test_summary_stats(self) -> None:
259 """Test the comparisons and attributes of ObservationSummaryStats."""
260 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4))
261 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5))
262 self.assertNotEqual(
263 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2))
264 )
266 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
267 def test_round_trip_ndf(self):
268 """NDF round-trip for VisitImage."""
269 with RoundtripNdf(self, self.visit_image) as roundtrip:
270 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
271 self.assertEqual(roundtrip.result.summary_stats, self.visit_image.summary_stats)
272 self.assertEqual(type(roundtrip.result.psf), type(self.visit_image.psf))
274 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
275 def test_fits_ndf_consistency(self):
276 """FITS and NDF backends produce equal VisitImages on round-trip."""
277 with RoundtripFits(self, self.visit_image) as fits_rt, RoundtripNdf(self, self.visit_image) as ndf_rt:
278 assert_masked_images_equal(self, self.visit_image, fits_rt.result, expect_view=False)
279 assert_masked_images_equal(self, self.visit_image, ndf_rt.result, expect_view=False)
280 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False)
282 def test_read_write(self) -> None:
283 """Test that a visit can round trip through a FITS file."""
284 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
285 # Check that we're still using the right compression, and that we
286 # wrote WCSs.
287 fits = roundtrip.inspect()
288 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
289 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN")
290 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
291 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN")
292 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
293 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN")
294 # Check a subimage read.
295 subbox = Box.factory[8:13, 9:30]
296 subimage = roundtrip.get(bbox=subbox)
297 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
298 with self.subTest():
299 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
300 with self.subTest():
301 obs_info = roundtrip.get("obs_info")
302 self.assertIsInstance(obs_info, ObservationInfo)
303 self.assertEqual(obs_info, self.visit_image.obs_info)
304 with self.subTest():
305 summary_stats = roundtrip.get("summary_stats")
306 self.assertIsInstance(summary_stats, ObservationSummaryStats)
307 self.assertEqual(summary_stats, self.visit_image.summary_stats)
308 with self.subTest():
309 psf = roundtrip.get("psf")
310 self.assertIsInstance(psf, GaussianPointSpreadFunction)
311 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox)
313 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
314 # Check that the round-tripped headers are the same (up to card order).
315 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1)
316 self.assertEqual(
317 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
318 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
319 )
320 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
321 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
322 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
323 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info)
324 self.assertIsNotNone(roundtrip.result.summary_stats)
325 self.assertEqual(
326 roundtrip.result.summary_stats.psfSigma,
327 self.visit_image.summary_stats.psfSigma,
328 )
329 self.assertEqual(
330 roundtrip.result.summary_stats.zeroPoint,
331 self.visit_image.summary_stats.zeroPoint,
332 )
333 self.assertEqual(roundtrip.result.bounds, self.polygon)
336@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
337class VisitImageLegacyTestCase(unittest.TestCase):
338 """Tests for the VisitImage class and the basics of the archive system.
340 Requires legacy code.
341 """
343 @classmethod
344 def setUpClass(cls) -> None:
345 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
346 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits")
347 try:
348 from lsst.afw.image import ExposureFitsReader
350 cls.legacy_exposure = ExposureFitsReader(cls.filename).read()
351 except ImportError:
352 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None
353 cls.plane_map = plane_map = get_legacy_visit_image_mask_planes()
354 cls.visit_image = VisitImage.read_legacy(
355 cls.filename, preserve_quantization=True, plane_map=plane_map
356 )
358 def test_legacy_errors(self) -> None:
359 """Legacy read failure modes."""
360 with self.assertRaises(ValueError):
361 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC")
362 with self.assertRaises(ValueError):
363 VisitImage.from_legacy(self.legacy_exposure, visit=123456)
364 with self.assertRaises(ValueError):
365 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy)
366 visit = VisitImage.from_legacy(
367 self.legacy_exposure, instrument="LSSTCam", unit=u.nJy, visit=2025052000177
368 )
369 self.assertEqual(visit.unit, u.nJy)
371 with self.assertRaises(ValueError):
372 VisitImage.read_legacy(self.filename, instrument="HSC")
373 with self.assertRaises(ValueError):
374 VisitImage.read_legacy(self.filename, visit=123456)
376 def test_component_reads(self) -> None:
377 """Test reads of components from legacy file."""
378 visit = VisitImage.read_legacy(self.filename)
379 proj = VisitImage.read_legacy(self.filename, component="projection")
380 assert_projections_equal(self, proj, visit.projection, expect_identity=False)
381 image = VisitImage.read_legacy(self.filename, component="image")
382 self.assertEqual(image, visit.image)
383 self.check_legacy_obs_info(image.obs_info)
384 assert_projections_equal(self, proj, image.projection, expect_identity=False)
385 variance = VisitImage.read_legacy(self.filename, component="variance")
386 self.assertEqual(variance, visit.variance)
387 assert_projections_equal(self, proj, variance.projection, expect_identity=False)
388 self.check_legacy_obs_info(variance.obs_info)
389 mask = VisitImage.read_legacy(self.filename, component="mask")
390 self.assertEqual(mask, visit.mask)
391 assert_projections_equal(self, proj, mask.projection, expect_identity=False)
392 self.check_legacy_obs_info(mask.obs_info)
393 psf = VisitImage.read_legacy(self.filename, component="psf")
394 self.assertIsInstance(psf, PointSpreadFunction)
395 obs_info = VisitImage.read_legacy(self.filename, component="obs_info")
396 self.check_legacy_obs_info(obs_info)
397 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats")
398 self.assertIsInstance(summary_stats, ObservationSummaryStats)
399 self.assertEqual(summary_stats.nPsfStar, 93)
400 compare_aperture_corrections_to_legacy(
401 self,
402 VisitImage.read_legacy(self.filename, component="aperture_corrections"),
403 self.legacy_exposure.info.getApCorrMap(),
404 visit.bbox,
405 )
406 detector = VisitImage.read_legacy(self.filename, component="detector")
407 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True)
409 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None:
410 """Check that an `ObservationInfo` instance is not `None`, and that it
411 matches the one in the legacy test data file.
412 """
413 self.assertIsInstance(obs_info, ObservationInfo)
414 self.assertEqual(obs_info.instrument, "LSSTCam")
415 self.assertEqual(obs_info.detector_num, 85, obs_info)
416 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info)
417 self.assertEqual(obs_info.physical_filter, "r_57", obs_info)
419 def test_obs_info(self) -> None:
420 """Check that ObservationInfo has been constructed."""
421 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
422 self.assertIsNotNone(legacy.obs_info)
423 self.maxDiff = None
424 self.assertEqual(legacy.obs_info, self.visit_image.obs_info)
425 assert legacy.obs_info is not None # for mypy.
426 self.assertEqual(legacy.obs_info.instrument, "LSSTCam")
427 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info)
428 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info)
429 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info)
431 def test_aperture_corrections_to_legacy(self) -> None:
432 """Test that we can convert an aperture correction map back to a
433 legacy `lsst.afw.image.ApCorrMap`.
434 """
435 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections)
436 compare_aperture_corrections_to_legacy(
437 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox
438 )
440 def test_read_legacy_headers(self) -> None:
441 """Test that headers were correctly stripped and interpreted in
442 `VisitImage.read_legacy`.
443 """
444 # Check that we read the units from BUNIT.
445 self.assertEqual(self.visit_image.unit, astropy.units.nJy)
446 # Check that the primary header has the keys we want, and none of the
447 # keys we don't want.
448 header = self.visit_image._opaque_metadata.headers[ExtensionKey()]
449 self.assertIn("EXPTIME", header)
450 self.assertEqual(header["PLATFORM"], "lsstcam")
451 self.assertNotIn("LSST BUTLER ID", header)
452 self.assertNotIn("AR HDU", header)
453 self.assertNotIn("A_ORDER", header)
454 # Check that the extension HDUs do not have any custom headers.
455 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
456 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
457 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
459 def test_from_legacy_headers(self) -> None:
460 """Test that from_legacy handles headers properly."""
461 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
462 header = legacy._opaque_metadata.headers[ExtensionKey()]
463 self.assertIn("EXPTIME", header)
464 self.assertEqual(header["PLATFORM"], "lsstcam")
465 self.assertNotIn("LSST BUTLER ID", header)
466 self.assertNotIn("AR HDU", header)
467 self.assertNotIn("A_ORDER", header)
468 # Check that the extension HDUs do not have any custom headers.
469 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
470 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
471 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
473 def test_rewrite(self) -> None:
474 """Test that we can rewrite the visit image and preserve both
475 lossy-compressed pixel values and components exactly.
476 """
477 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
478 # Check that we're still using the right compression, and that we
479 # wrote WCSs.
480 fits = roundtrip.inspect()
481 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
482 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP")
483 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
484 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP")
485 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
486 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP")
487 # Check a subimage read.
488 subbox = Box.factory[8:13, 9:30]
489 subimage = roundtrip.get(bbox=subbox)
490 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
491 alternates: dict[str, Any] = {}
492 with self.subTest():
493 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
494 alternates = {
495 k: roundtrip.get(k)
496 for k in [
497 "projection",
498 "image",
499 "mask",
500 "variance",
501 "psf",
502 "obs_info",
503 "summary_stats",
504 "aperture_corrections",
505 "detector",
506 ]
507 }
508 # Try to do a butler get of a component with storage class
509 # override.
510 with self.subTest():
511 if self.legacy_exposure is not None:
512 import lsst.afw.image
514 # We have VisitInfo available.
515 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo")
516 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo)
517 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam")
518 else:
519 raise unittest.SkipTest("Can not test VisitInfo conversion without afw")
521 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
522 # Check that the round-tripped headers are the same (up to card order).
523 self.assertEqual(
524 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
525 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
526 )
527 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
528 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
529 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
530 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam")
531 compare_visit_image_to_legacy(
532 self,
533 roundtrip.result,
534 self.legacy_exposure,
535 expect_view=False,
536 plane_map=self.plane_map,
537 **DP2_VISIT_DETECTOR_DATA_ID,
538 alternates=alternates,
539 )
540 # Check converting from the legacy object in-memory.
541 compare_visit_image_to_legacy(
542 self,
543 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map),
544 self.legacy_exposure,
545 expect_view=True,
546 plane_map=self.plane_map,
547 **DP2_VISIT_DETECTOR_DATA_ID,
548 )
550 def test_butler_converters(self) -> None:
551 """Test that we can read a VisitImage and its components from a butler
552 dataset written as an `lsst.afw.image.Exposure`.
553 """
554 if self.legacy_exposure is None:
555 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.")
556 with TemporaryButler(legacy="ExposureF") as helper:
557 from lsst.daf.butler import FileDataset
559 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink")
560 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage")
561 visit_image = helper.butler.get(visit_image_ref)
562 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox"))
563 self.assertEqual(bbox, visit_image.bbox)
564 alternates = {
565 k: helper.butler.get(visit_image_ref.makeComponentRef(k))
566 # TODO: including "projection" or "obs_info" here fails because
567 # there's code in daf_butler that expects any component to be
568 # valid for the *internal* storage class, not the requested
569 # one, and that's difficult to fix because it's tied up with
570 # the data ID standardization logic.
571 for k in ["image", "mask", "variance", "psf", "detector"]
572 }
573 compare_visit_image_to_legacy(
574 self,
575 visit_image,
576 self.legacy_exposure,
577 expect_view=False,
578 plane_map=self.plane_map,
579 alternates=alternates,
580 **DP2_VISIT_DETECTOR_DATA_ID,
581 )
584if __name__ == "__main__":
585 unittest.main()