Coverage for tests / test_visit_image.py: 14%
405 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 08:27 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 08:27 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import unittest
16import warnings
17from typing import Any
19import astropy.io.fits
20import astropy.units as u
21import astropy.wcs
22import numpy as np
23from astro_metadata_translator import ObservationInfo
25from lsst.images import (
26 BackgroundMap,
27 Box,
28 DetectorFrame,
29 Image,
30 MaskPlane,
31 MaskSchema,
32 ObservationSummaryStats,
33 Polygon,
34 ProjectionAstropyView,
35 TractFrame,
36 VisitImage,
37 get_legacy_visit_image_mask_planes,
38)
39from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy
40from lsst.images.cameras import Detector
41from lsst.images.fields import ChebyshevField, field_from_legacy_photo_calib
42from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata
43from lsst.images.json import read as read_json
44from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction
45from lsst.images.tests import (
46 DP2_VISIT_DETECTOR_DATA_ID,
47 RoundtripFits,
48 RoundtripNdf,
49 TemporaryButler,
50 assert_close,
51 assert_masked_images_equal,
52 assert_projections_equal,
53 compare_aperture_corrections_to_legacy,
54 compare_detector_to_legacy,
55 compare_photo_calib_to_legacy,
56 compare_visit_image_to_legacy,
57 make_random_projection,
58)
60try:
61 import h5py # noqa: F401
63 HAVE_H5PY = True
64except ImportError:
65 HAVE_H5PY = False
67EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
68LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
71class VisitImageTestCase(unittest.TestCase):
72 """Basic Tests for VisitImage."""
74 @classmethod
75 def setUpClass(cls) -> None:
76 cls.rng = np.random.default_rng(500)
77 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096])
78 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")])
79 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4, physical_filter="r1")
80 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)
81 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13])
82 cls.aperture_corrections: ApertureCorrectionMap = {
83 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])),
84 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])),
85 }
86 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json"))
88 opaque = FitsOpaqueMetadata()
89 hdr = astropy.io.fits.Header()
90 with warnings.catch_warnings():
91 # Silence warnings about long keys becoming HIERARCH.
92 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
93 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"})
94 opaque.extract_legacy_primary_header(hdr)
96 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy)
97 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy)
98 # polygon is the lower triangle of the image.
99 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5])
100 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096])
101 # API signature suggests projection and obs_info can be None but they
102 # are required (unless you pass them in via the image plane).
103 cls.visit_image = VisitImage(
104 cls.image,
105 variance=cls.variance,
106 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
107 mask_schema=cls.mask_schema,
108 projection=cls.projection,
109 obs_info=cls.obs_info,
110 summary_stats=cls.summary_stats,
111 detector=cls.detector,
112 bounds=cls.polygon,
113 aperture_corrections=cls.aperture_corrections,
114 band="r",
115 )
116 cls.visit_image.backgrounds.add(
117 "standard",
118 ChebyshevField(det_frame.bbox, np.array([[2.0]])),
119 description="Background subtracted from the image.",
120 is_subtracted=True,
121 )
122 cls.visit_image._opaque_metadata = opaque
123 cls.simplest_visit_image = VisitImage(
124 cls.image,
125 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
126 mask_schema=cls.mask_schema,
127 projection=cls.projection,
128 detector=cls.detector,
129 obs_info=cls.obs_info,
130 band="r",
131 )
133 def test_basics(self) -> None:
134 """Test basic constructor patterns."""
135 # Test default fill of variance.
136 visit = self.simplest_visit_image
137 self.assertEqual(visit.variance.array[0, 0], 1.0)
138 self.assertIs(visit[...], visit)
139 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])")
140 self.assertEqual(
141 repr(visit),
142 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024)),"
143 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')],"
144 " dtype=dtype('uint8')))",
145 )
147 astropy_wcs = visit.astropy_wcs
148 self.assertIsInstance(astropy_wcs, ProjectionAstropyView)
149 approx_wcs = visit.fits_wcs
150 self.assertIsInstance(approx_wcs, astropy.wcs.WCS)
152 with self.assertRaises(TypeError):
153 # Requires a PSF.
154 VisitImage(
155 self.image,
156 mask_schema=self.mask_schema,
157 projection=self.projection,
158 obs_info=self.obs_info,
159 detector=self.detector,
160 band="r",
161 )
163 with self.assertRaises(TypeError):
164 # Requires ObservationInfo.
165 VisitImage(
166 self.image,
167 psf=self.gaussian_psf,
168 mask_schema=self.mask_schema,
169 projection=self.projection,
170 detector=self.detector,
171 band="r",
172 )
174 with self.assertRaises(TypeError):
175 # Requires a projection.
176 VisitImage(
177 self.image,
178 psf=self.gaussian_psf,
179 mask_schema=self.mask_schema,
180 obs_info=self.obs_info,
181 detector=self.detector,
182 band="r",
183 )
185 with self.assertRaises(TypeError):
186 # Requires a detector.
187 VisitImage(
188 self.image,
189 psf=self.gaussian_psf,
190 mask_schema=self.mask_schema,
191 projection=self.projection,
192 obs_info=self.obs_info,
193 band="r",
194 )
196 with self.assertRaises(TypeError):
197 # Requires some form of mask.
198 VisitImage(
199 self.image,
200 psf=self.gaussian_psf,
201 projection=self.projection,
202 obs_info=self.obs_info,
203 detector=self.detector,
204 band="r",
205 )
207 with self.assertRaises(TypeError):
208 VisitImage(
209 Image(42, shape=(5, 5)),
210 psf=self.gaussian_psf,
211 mask_schema=self.mask_schema,
212 projection=self.projection,
213 obs_info=self.obs_info,
214 detector=self.detector,
215 band="r",
216 )
218 # Requires a DetectorFrame.
219 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10])
220 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096])
221 with self.assertRaises(TypeError):
222 VisitImage(
223 self.image,
224 projection=tract_proj,
225 psf=self.gaussian_psf,
226 mask_schema=self.mask_schema,
227 obs_info=self.obs_info,
228 detector=self.detector,
229 band="r",
230 )
232 # Variance unit mismatch.
233 with self.assertRaises(ValueError):
234 VisitImage(
235 self.image,
236 variance=self.image,
237 psf=self.gaussian_psf,
238 mask_schema=self.mask_schema,
239 projection=self.projection,
240 obs_info=self.obs_info,
241 detector=self.detector,
242 band="r",
243 )
245 def test_copy_and_slice(self) -> None:
246 """Test that arrays and components are copied (when not immutable) by
247 'copy' and referenced by 'slice'.
248 """
249 visit = self.visit_image
250 copy = visit.copy()
251 copy.image.array[0, 0] = 30.0
252 self.assertEqual(visit.image.array[0, 0], 42.0)
253 self.assertEqual(copy.image.array[0, 0], 30.0)
254 subvisit = visit[Box.factory[0:5, 0:5]]
255 # Check summary stats.
256 self.assertEqual(copy.summary_stats, visit.summary_stats)
257 self.assertIsNot(copy.summary_stats, visit.summary_stats)
258 self.assertEqual(subvisit.summary_stats, visit.summary_stats)
259 self.assertIs(subvisit.summary_stats, visit.summary_stats)
260 # Check aperture corrections.
261 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys())
262 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections)
263 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys())
264 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections)
265 # Check backgrounds.
266 self.assertEqual(copy.backgrounds.keys(), visit.backgrounds.keys())
267 self.assertIsNot(copy.backgrounds, visit.backgrounds)
268 self.assertEqual(subvisit.backgrounds.keys(), visit.backgrounds.keys())
269 self.assertIs(subvisit.backgrounds, visit.backgrounds)
270 # Check bounds.
271 self.assertIs(copy.bounds, self.polygon)
272 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox
274 def test_obs_info(self) -> None:
275 """Check that ObservationInfo has been constructed."""
276 visit = self.visit_image
277 self.assertIsNotNone(visit.obs_info)
278 self.maxDiff = None
279 assert visit.obs_info is not None # for mypy.
280 self.assertEqual(visit.obs_info.instrument, "LSSTCam")
282 def test_summary_stats(self) -> None:
283 """Test the comparisons and attributes of ObservationSummaryStats."""
284 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4))
285 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5))
286 self.assertNotEqual(
287 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2))
288 )
290 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
291 def test_round_trip_ndf(self):
292 """NDF round-trip for VisitImage."""
293 with RoundtripNdf(self, self.visit_image) as roundtrip:
294 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
295 self.assertEqual(roundtrip.result.summary_stats, self.visit_image.summary_stats)
296 self.assertEqual(type(roundtrip.result.psf), type(self.visit_image.psf))
298 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
299 def test_fits_ndf_consistency(self):
300 """FITS and NDF backends produce equal VisitImages on round-trip."""
301 with RoundtripFits(self, self.visit_image) as fits_rt, RoundtripNdf(self, self.visit_image) as ndf_rt:
302 assert_masked_images_equal(self, self.visit_image, fits_rt.result, expect_view=False)
303 assert_masked_images_equal(self, self.visit_image, ndf_rt.result, expect_view=False)
304 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False)
306 def test_read_write(self) -> None:
307 """Test that a visit can round trip through a FITS file."""
308 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
309 # Check that we're still using the right compression, and that we
310 # wrote WCSs.
311 fits = roundtrip.inspect()
312 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
313 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN")
314 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
315 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN")
316 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
317 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN")
318 # Check a subimage read.
319 subbox = Box.factory[8:13, 9:30]
320 subimage = roundtrip.get(bbox=subbox)
321 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
322 with self.subTest():
323 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
324 with self.subTest():
325 obs_info = roundtrip.get("obs_info")
326 self.assertIsInstance(obs_info, ObservationInfo)
327 self.assertEqual(obs_info, self.visit_image.obs_info)
328 with self.subTest():
329 summary_stats = roundtrip.get("summary_stats")
330 self.assertIsInstance(summary_stats, ObservationSummaryStats)
331 self.assertEqual(summary_stats, self.visit_image.summary_stats)
332 with self.subTest():
333 psf = roundtrip.get("psf")
334 self.assertIsInstance(psf, GaussianPointSpreadFunction)
335 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox)
336 with self.subTest():
337 backgrounds = roundtrip.get("backgrounds")
338 self.assertIsInstance(backgrounds, BackgroundMap)
339 self.assertEqual(backgrounds.keys(), {"standard"})
340 self.assertIsInstance(backgrounds["standard"].field, ChebyshevField)
341 self.assertEqual(backgrounds.subtracted.name, "standard")
342 self.assertEqual(
343 roundtrip.result.backgrounds.subtracted.description,
344 "Background subtracted from the image.",
345 )
347 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
348 # Check that the round-tripped headers are the same (up to card order).
349 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1)
350 self.assertEqual(
351 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
352 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
353 )
354 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
355 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
356 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
357 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info)
358 self.assertIsNotNone(roundtrip.result.summary_stats)
359 self.assertEqual(
360 roundtrip.result.summary_stats.psfSigma,
361 self.visit_image.summary_stats.psfSigma,
362 )
363 self.assertEqual(
364 roundtrip.result.summary_stats.zeroPoint,
365 self.visit_image.summary_stats.zeroPoint,
366 )
367 self.assertEqual(roundtrip.result.bounds, self.polygon)
368 self.assertIsInstance(roundtrip.result.backgrounds, BackgroundMap)
369 self.assertEqual(roundtrip.result.backgrounds.keys(), {"standard"})
370 self.assertIsInstance(roundtrip.result.backgrounds["standard"].field, ChebyshevField)
371 self.assertEqual(roundtrip.result.backgrounds.subtracted.name, "standard")
372 self.assertEqual(
373 roundtrip.result.backgrounds.subtracted.description, "Background subtracted from the image."
374 )
377class VisitImageLegacyTestMixin:
378 """Tests for the VisitImage class and the basics of the archive, to be
379 specialized for a particular test image.
381 `setUp` or `setUpClass` must be implemented to set the attributes declared
382 in the class.
383 """
385 filename: str
386 legacy_exposure: Any
387 plane_map: dict[str, MaskPlane]
388 visit_image: VisitImage
389 unit: u.UnitBase
391 def test_legacy_errors(self) -> None:
392 """Legacy read failure modes."""
393 with self.assertRaises(ValueError):
394 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC")
395 with self.assertRaises(ValueError):
396 VisitImage.from_legacy(self.legacy_exposure, visit=123456)
397 with self.assertRaises(ValueError):
398 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy)
399 visit = VisitImage.from_legacy(
400 self.legacy_exposure, instrument="LSSTCam", unit=self.unit, visit=2025052000177
401 )
402 self.assertEqual(visit.unit, self.unit)
404 with self.assertRaises(ValueError):
405 VisitImage.read_legacy(self.filename, instrument="HSC")
406 with self.assertRaises(ValueError):
407 VisitImage.read_legacy(self.filename, visit=123456)
409 def test_component_reads(self) -> None:
410 """Test reads of components from legacy file."""
411 visit = VisitImage.read_legacy(self.filename)
412 proj = VisitImage.read_legacy(self.filename, component="projection")
413 assert_projections_equal(self, proj, visit.projection, expect_identity=False)
414 image = VisitImage.read_legacy(self.filename, component="image")
415 self.assertEqual(image, visit.image)
416 assert_projections_equal(self, proj, image.projection, expect_identity=False)
417 variance = VisitImage.read_legacy(self.filename, component="variance")
418 self.assertEqual(variance, visit.variance)
419 assert_projections_equal(self, proj, variance.projection, expect_identity=False)
420 mask = VisitImage.read_legacy(self.filename, component="mask")
421 self.assertEqual(mask, visit.mask)
422 assert_projections_equal(self, proj, mask.projection, expect_identity=False)
423 psf = VisitImage.read_legacy(self.filename, component="psf")
424 self.assertIsInstance(psf, PointSpreadFunction)
425 obs_info = VisitImage.read_legacy(self.filename, component="obs_info")
426 self.check_legacy_obs_info(obs_info)
427 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats")
428 self.assertIsInstance(summary_stats, ObservationSummaryStats)
429 self.assertEqual(summary_stats.nPsfStar, self.legacy_exposure.info.getSummaryStats().nPsfStar)
430 compare_aperture_corrections_to_legacy(
431 self,
432 VisitImage.read_legacy(self.filename, component="aperture_corrections"),
433 self.legacy_exposure.info.getApCorrMap(),
434 visit.bbox,
435 )
436 detector = VisitImage.read_legacy(self.filename, component="detector")
437 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True)
438 photometric_scaling = VisitImage.read_legacy(self.filename, component="photometric_scaling")
439 compare_photo_calib_to_legacy(
440 self,
441 photometric_scaling,
442 self.legacy_exposure.getPhotoCalib(),
443 subimage_bbox=visit.bbox,
444 )
446 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None:
447 """Check that an `ObservationInfo` instance is not `None`, and that it
448 matches the one in the legacy test data file.
449 """
450 self.assertIsInstance(obs_info, ObservationInfo)
451 self.assertEqual(obs_info.instrument, "LSSTCam")
452 self.assertEqual(obs_info.detector_num, 85, obs_info)
453 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info)
454 self.assertEqual(obs_info.physical_filter, "r_57", obs_info)
456 def test_obs_info(self) -> None:
457 """Check that ObservationInfo has been constructed."""
458 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
459 self.assertIsNotNone(legacy.obs_info)
460 self.maxDiff = None
461 self.assertEqual(legacy.obs_info, self.visit_image.obs_info)
462 assert legacy.obs_info is not None # for mypy.
463 self.assertEqual(legacy.obs_info.instrument, "LSSTCam")
464 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info)
465 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info)
466 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info)
468 def test_aperture_corrections_to_legacy(self) -> None:
469 """Test that we can convert an aperture correction map back to a
470 legacy `lsst.afw.image.ApCorrMap`.
471 """
472 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections)
473 compare_aperture_corrections_to_legacy(
474 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox
475 )
477 def test_read_legacy_headers(self) -> None:
478 """Test that headers were correctly stripped and interpreted in
479 `VisitImage.read_legacy`.
480 """
481 # Check that we read the units from BUNIT.
482 self.assertEqual(self.visit_image.unit, self.unit)
483 # Check that the primary header has the keys we want, and none of the
484 # keys we don't want.
485 header = self.visit_image._opaque_metadata.headers[ExtensionKey()]
486 self.assertIn("EXPTIME", header)
487 self.assertEqual(header["PLATFORM"], "lsstcam")
488 self.assertNotIn("LSST BUTLER ID", header)
489 self.assertNotIn("AR HDU", header)
490 self.assertNotIn("A_ORDER", header)
491 # Check that the extension HDUs do not have any custom headers.
492 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
493 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
494 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
496 def test_from_legacy_headers(self) -> None:
497 """Test that from_legacy handles headers properly."""
498 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
499 header = legacy._opaque_metadata.headers[ExtensionKey()]
500 self.assertIn("EXPTIME", header)
501 self.assertEqual(header["PLATFORM"], "lsstcam")
502 self.assertNotIn("LSST BUTLER ID", header)
503 self.assertNotIn("AR HDU", header)
504 self.assertNotIn("A_ORDER", header)
505 # Check that the extension HDUs do not have any custom headers.
506 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
507 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
508 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
510 def test_rewrite(self) -> None:
511 """Test that we can rewrite the visit image and preserve both
512 lossy-compressed pixel values and components exactly.
513 """
514 import lsst.afw.image
516 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
517 # Check that we're still using the right compression, and that we
518 # wrote WCSs.
519 fits = roundtrip.inspect()
520 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
521 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP")
522 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
523 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP")
524 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
525 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP")
526 # Check a subimage read.
527 subbox = Box.factory[8:13, 9:30]
528 subimage = roundtrip.get(bbox=subbox)
529 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
530 alternates: dict[str, Any] = {}
531 with self.subTest():
532 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
533 alternates = {
534 k: roundtrip.get(k)
535 for k in [
536 "projection",
537 "image",
538 "mask",
539 "variance",
540 "psf",
541 "obs_info",
542 "summary_stats",
543 "aperture_corrections",
544 "detector",
545 "photometric_scaling",
546 ]
547 }
548 # Test reading back in as an Exposure.
549 with self.subTest():
550 legacy_exposure = roundtrip.get(storageClass="Exposure")
551 self.assertIsInstance(legacy_exposure, lsst.afw.image.Exposure)
552 # This covers most of the compnents, which have clean 1-1
553 # mappings from legacy to new:
554 compare_visit_image_to_legacy(
555 self,
556 self.visit_image,
557 legacy_exposure,
558 expect_view=False,
559 plane_map=self.plane_map,
560 **DP2_VISIT_DETECTOR_DATA_ID,
561 )
562 # A few components are different enough to merit extra
563 # attention:
564 if self.visit_image.unit == u.nJy:
565 self.assertTrue(legacy_exposure.getPhotoCalib()._isConstant)
566 self.assertEqual(legacy_exposure.getPhotoCalib().getCalibrationMean(), 1.0)
567 else:
568 compare_photo_calib_to_legacy(
569 self,
570 self.visit_image.photometric_scaling,
571 legacy_exposure.getPhotoCalib(),
572 subimage_bbox=subbox,
573 )
574 self.assertEqual(legacy_exposure.info.getId(), self.legacy_exposure.info.getId())
575 # Try to do a butler get of a component with storage class
576 # override.
577 with self.subTest():
578 # We have VisitInfo available.
579 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo")
580 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo)
581 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam")
583 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
584 # Check that the round-tripped headers are the same (up to card order).
585 self.assertEqual(
586 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
587 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
588 )
589 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
590 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
591 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
592 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam")
593 compare_visit_image_to_legacy(
594 self,
595 roundtrip.result,
596 self.legacy_exposure,
597 expect_view=False,
598 plane_map=self.plane_map,
599 **DP2_VISIT_DETECTOR_DATA_ID,
600 alternates=alternates,
601 )
602 # Check converting from the legacy object in-memory.
603 compare_visit_image_to_legacy(
604 self,
605 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map),
606 self.legacy_exposure,
607 expect_view=True,
608 plane_map=self.plane_map,
609 **DP2_VISIT_DETECTOR_DATA_ID,
610 )
612 def test_butler_converters(self) -> None:
613 """Test that we can read a VisitImage and its components from a butler
614 dataset written as an `lsst.afw.image.Exposure`.
615 """
616 if self.legacy_exposure is None:
617 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.")
618 with TemporaryButler(legacy="ExposureF") as helper:
619 from lsst.daf.butler import FileDataset
621 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink")
622 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage")
623 with warnings.catch_warnings():
624 # Silence warnings about data ID and filter label disagreeing.
625 warnings.simplefilter("ignore", category=UserWarning)
626 visit_image = helper.butler.get(visit_image_ref)
627 # We didn't ask for the quantization to be preserved, so it
628 # shouldn't be.
629 self.assertEqual(visit_image._opaque_metadata.precompressed.keys(), set())
630 # This time preserve the quantization
631 visit_image = helper.butler.get(visit_image_ref, parameters={"preserve_quantization": True})
632 self.assertEqual(visit_image._opaque_metadata.precompressed.keys(), {"IMAGE", "VARIANCE"})
633 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox"))
634 self.assertEqual(bbox, visit_image.bbox)
635 alternates = {
636 k: helper.butler.get(visit_image_ref.makeComponentRef(k))
637 # TODO: including "projection" or "obs_info" here fails because
638 # there's code in daf_butler that expects any component to be
639 # valid for the *internal* storage class, not the requested
640 # one, and that's difficult to fix because it's tied up with
641 # the data ID standardization logic.
642 for k in ["image", "mask", "variance", "bbox", "psf", "detector"]
643 }
644 compare_visit_image_to_legacy(
645 self,
646 visit_image,
647 self.legacy_exposure,
648 expect_view=False,
649 plane_map=self.plane_map,
650 alternates=alternates,
651 **DP2_VISIT_DETECTOR_DATA_ID,
652 )
653 # Add some metadata to the new VisitImage and then do a converting
654 # `put` that should write to the old format (we have to delete the
655 # old one first, which just deletes a symlink).
656 helper.butler.pruneDatasets([helper.legacy], purge=True, unstore=True, disassociate=True)
657 visit_image.metadata["MixedCaseKey"] = 52
658 helper.butler.put(visit_image, visit_image_ref)
659 # Check that we can read *that* back in as a legacy exposure.
660 legacy_exposure = helper.butler.get(helper.legacy)
661 compare_visit_image_to_legacy(
662 self,
663 visit_image,
664 legacy_exposure,
665 expect_view=False,
666 plane_map=self.plane_map,
667 alternates=alternates,
668 **DP2_VISIT_DETECTOR_DATA_ID,
669 )
670 # Check that we can read it back in as a VisitImage, and that the
671 # new metadata is preserved.
672 visit_image_2 = helper.butler.get(visit_image_ref)
673 compare_visit_image_to_legacy(
674 self,
675 visit_image_2,
676 legacy_exposure,
677 expect_view=False,
678 plane_map=self.plane_map,
679 alternates=alternates,
680 **DP2_VISIT_DETECTOR_DATA_ID,
681 )
682 self.assertEqual(visit_image_2.metadata["MixedCaseKey"], 52)
685@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
686class VisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin):
687 """Tests for the VisitImage class using a DRP-final visit_image dataset.
689 Requires legacy code.
690 """
692 @classmethod
693 def setUpClass(cls) -> None:
694 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
695 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits")
696 try:
697 from lsst.afw.image import ExposureFitsReader
699 cls.legacy_exposure = ExposureFitsReader(cls.filename).read()
700 except ImportError:
701 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None
702 cls.plane_map = get_legacy_visit_image_mask_planes()
703 cls.visit_image = VisitImage.read_legacy(
704 cls.filename, preserve_quantization=True, plane_map=cls.plane_map
705 )
706 cls.unit = u.nJy
708 def test_convert_unit(self) -> None:
709 """Test using the ``photometric_scaling`` to swap between
710 calibrated and instrumental units.
712 This includes tests of the `VisitImage.to_legacy` logic for units that
713 don't map directly to `lsst.afw.image.PhotoCalib` conventions.
714 """
715 from lsst.afw.table import ExposureCatalog
717 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
718 # Make a copy of class state so we can modify it without breaking
719 # other tests.
720 original = self.visit_image.copy()
721 # We should not be able to convert to instrumental units when there is
722 # no photometric scaling.
723 with self.assertRaises(u.UnitConversionError):
724 original.convert_unit(u.electron)
725 # Converting to the current unit should be a no-op that does not need
726 # to copy.
727 visit_image_nJy = original.convert_unit(u.nJy, copy=False)
728 self.assertTrue(np.may_share_memory(visit_image_nJy.image.array, original.image.array))
729 self.assertTrue(np.may_share_memory(visit_image_nJy.variance.array, original.variance.array))
730 # Even without a photometric_scaling attached, we should be able to
731 # convert to a compatible unit, but only if we allow copies.
732 with self.assertRaises(u.UnitConversionError):
733 original.convert_unit(u.mJy, copy=False)
734 visit_image_mJy = original.convert_unit(u.mJy, copy="as-needed")
735 self.assertEqual(visit_image_mJy.unit, u.mJy)
736 assert_close(self, visit_image_mJy.image.array, original.image.array * 1e-6)
737 self.assertTrue(np.may_share_memory(visit_image_nJy.mask.array, original.mask.array))
738 assert_close(self, visit_image_mJy.variance.array, original.variance.array * 1e-12)
739 # Converting a mJy image to legacy should make a PhotoCalib that maps
740 # mJy to nJy.
741 legacy_exposure_mJy = visit_image_mJy.to_legacy()
742 assert_close(self, legacy_exposure_mJy.getPhotoCalib().getCalibrationMean(), 1e6)
743 legacy_masked_image_nJy = legacy_exposure_mJy.getPhotoCalib().calibrateImage(
744 legacy_exposure_mJy.maskedImage
745 )
746 assert_close(self, visit_image_nJy.image.array, legacy_masked_image_nJy.image.array)
747 assert_close(self, visit_image_nJy.variance.array, legacy_masked_image_nJy.variance.array)
748 # Test that we haven't dropped any component objects along the way,
749 # and that they're all still the same objects or thin views.
750 self.assertTrue(np.may_share_memory(visit_image_mJy.mask.array, original.mask.array))
751 self.assertIs(visit_image_mJy.projection, original.projection)
752 self.assertIs(visit_image_mJy.obs_info, original.obs_info)
753 self.assertIs(visit_image_mJy.summary_stats, original.summary_stats)
754 self.assertIs(visit_image_mJy.psf, original.psf)
755 self.assertIs(visit_image_mJy.detector, original.detector)
756 self.assertIs(visit_image_mJy.bounds, original.bounds)
757 self.assertIs(visit_image_mJy.aperture_corrections, original.aperture_corrections)
758 self.assertIs(visit_image_mJy.photometric_scaling, original.photometric_scaling)
759 # Attach the final PhotoCalib (this isn't stored with the legacy file
760 # because that is the mapping to nJy, which is trivial).
761 visit_summary = ExposureCatalog.readFits(
762 os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_summary.fits")
763 )
764 legacy_photo_calib = visit_summary.find(DP2_VISIT_DETECTOR_DATA_ID["detector"]).getPhotoCalib()
765 visit_image_nJy.photometric_scaling = field_from_legacy_photo_calib(
766 legacy_photo_calib, bounds=original.detector.bbox, instrumental_unit=u.electron
767 )
768 compare_photo_calib_to_legacy(
769 self,
770 visit_image_nJy.photometric_scaling,
771 self.legacy_exposure.getPhotoCalib(),
772 applied_legacy_photo_calib=legacy_photo_calib,
773 subimage_bbox=visit_image_nJy.bbox,
774 )
775 # We still can't convert to completely unrelated units.
776 with self.assertRaises(u.UnitConversionError):
777 visit_image_nJy.convert_unit(u.mm)
778 # Uncalibrating via the photometric_scaling matches what legacy code
779 # does, and by default it copies everything.
780 with self.assertRaises(u.UnitConversionError):
781 visit_image_nJy.convert_unit(u.electron, copy=False)
782 legacy_masked_image_e = legacy_photo_calib.uncalibrateImage(self.legacy_exposure.maskedImage)
783 visit_image_e = visit_image_nJy.convert_unit(u.electron)
784 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array)
785 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array)
786 self.assertFalse(np.may_share_memory(visit_image_e.mask.array, visit_image_nJy.mask.array))
787 # We can also uncalibrate if we start with an image that has units
788 # that are compatible with the photometric_scaling but not identical
789 # to it.
790 visit_image_mJy.photometric_scaling = visit_image_nJy.photometric_scaling
791 visit_image_e = visit_image_mJy.convert_unit(u.electron)
792 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array)
793 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array)
794 # We can re-apply the scaling to go back to calibrated units.
795 visit_image_nJy_2 = visit_image_e.convert_unit(u.nJy)
796 assert_close(self, visit_image_nJy_2.image.array, visit_image_nJy.image.array)
797 assert_close(self, visit_image_nJy_2.variance.array, original.variance.array)
798 # Try calibrating an image with a scaling that has units other than
799 # nJy in the numerator.
800 visit_image_e.photometric_scaling = visit_image_nJy.photometric_scaling * (1e-6 * u.mJy / u.nJy)
801 visit_image_nJy_3 = visit_image_e.convert_unit(u.nJy)
802 assert_close(self, visit_image_nJy_3.image.array, visit_image_nJy.image.array)
803 assert_close(self, visit_image_nJy_3.variance.array, original.variance.array)
804 # Try converting that uncalibrated image to legacy; the extra mJy/nJy
805 # factor should get included in the PhotoCalib to recover the original
806 # PhotoCalib.
807 legacy_exposure_e = visit_image_e.to_legacy()
808 assert_close(
809 self,
810 legacy_exposure_e.getPhotoCalib().getCalibrationMean(),
811 legacy_photo_calib.getCalibrationMean(),
812 )
813 legacy_masked_image_nJy = legacy_exposure_e.getPhotoCalib().calibrateImage(
814 legacy_exposure_e.maskedImage
815 )
816 assert_close(self, visit_image_nJy.image.array, legacy_masked_image_nJy.image.array)
817 assert_close(self, visit_image_nJy.variance.array, legacy_masked_image_nJy.variance.array)
820@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
821class PreliminaryVisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin):
822 """Tests for the VisitImage class using a DRP preliminary_visit_image
823 dataset.
825 Requires legacy code.
826 """
828 @classmethod
829 def setUpClass(cls) -> None:
830 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
831 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "preliminary_visit_image.fits")
832 try:
833 from lsst.afw.image import ExposureFitsReader
835 cls.legacy_exposure = ExposureFitsReader(cls.filename).read()
836 except ImportError:
837 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None
838 cls.plane_map = get_legacy_visit_image_mask_planes()
839 cls.visit_image = VisitImage.read_legacy(
840 cls.filename, preserve_quantization=True, plane_map=cls.plane_map
841 )
842 cls.unit = u.electron
845if __name__ == "__main__":
846 unittest.main()