Coverage for tests/test_visit_image.py: 15%
403 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 01:48 -0700
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-29 01:48 -0700
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import unittest
16import warnings
17from typing import Any
19import astropy.io.fits
20import astropy.units as u
21import astropy.wcs
22import numpy as np
23from astro_metadata_translator import ObservationInfo
25from lsst.images import (
26 BackgroundMap,
27 Box,
28 DetectorFrame,
29 Image,
30 MaskPlane,
31 MaskSchema,
32 ObservationSummaryStats,
33 Polygon,
34 ProjectionAstropyView,
35 TractFrame,
36 VisitImage,
37 get_legacy_visit_image_mask_planes,
38)
39from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy
40from lsst.images.cameras import Detector
41from lsst.images.fields import ChebyshevField, field_from_legacy_photo_calib
42from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata
43from lsst.images.json import read as read_json
44from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction
45from lsst.images.tests import (
46 DP2_VISIT_DETECTOR_DATA_ID,
47 RoundtripFits,
48 RoundtripJson,
49 RoundtripNdf,
50 TemporaryButler,
51 assert_close,
52 assert_masked_images_equal,
53 assert_projections_equal,
54 assert_visit_images_equal,
55 compare_aperture_corrections_to_legacy,
56 compare_detector_to_legacy,
57 compare_photo_calib_to_legacy,
58 compare_visit_image_to_legacy,
59 make_random_projection,
60)
62try:
63 import h5py # noqa: F401
65 HAVE_H5PY = True
66except ImportError:
67 HAVE_H5PY = False
69EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
70LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
73class VisitImageTestCase(unittest.TestCase):
74 """Basic Tests for VisitImage."""
76 @classmethod
77 def setUpClass(cls) -> None:
78 cls.rng = np.random.default_rng(500)
79 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096])
80 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")])
81 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4, physical_filter="r1")
82 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)
83 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13])
84 cls.aperture_corrections: ApertureCorrectionMap = {
85 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])),
86 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])),
87 }
88 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json"))
90 opaque = FitsOpaqueMetadata()
91 hdr = astropy.io.fits.Header()
92 with warnings.catch_warnings():
93 # Silence warnings about long keys becoming HIERARCH.
94 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
95 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"})
96 opaque.extract_legacy_primary_header(hdr)
98 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy)
99 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy)
100 # polygon is the lower triangle of the image.
101 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5])
102 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096])
103 # API signature suggests projection and obs_info can be None but they
104 # are required (unless you pass them in via the image plane).
105 cls.visit_image = VisitImage(
106 cls.image,
107 variance=cls.variance,
108 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
109 mask_schema=cls.mask_schema,
110 projection=cls.projection,
111 obs_info=cls.obs_info,
112 summary_stats=cls.summary_stats,
113 detector=cls.detector,
114 bounds=cls.polygon,
115 aperture_corrections=cls.aperture_corrections,
116 band="r",
117 )
118 cls.visit_image.backgrounds.add(
119 "standard",
120 ChebyshevField(det_frame.bbox, np.array([[2.0]])),
121 description="Background subtracted from the image.",
122 is_subtracted=True,
123 )
124 cls.visit_image._opaque_metadata = opaque
125 cls.simplest_visit_image = VisitImage(
126 cls.image,
127 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
128 mask_schema=cls.mask_schema,
129 projection=cls.projection,
130 detector=cls.detector,
131 obs_info=cls.obs_info,
132 band="r",
133 )
135 def test_basics(self) -> None:
136 """Test basic constructor patterns."""
137 # Test default fill of variance.
138 visit = self.simplest_visit_image
139 self.assertEqual(visit.variance.array[0, 0], 1.0)
140 self.assertIs(visit[...], visit)
141 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])")
142 self.assertEqual(
143 repr(visit),
144 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024)),"
145 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')],"
146 " dtype=dtype('uint8')))",
147 )
149 astropy_wcs = visit.astropy_wcs
150 self.assertIsInstance(astropy_wcs, ProjectionAstropyView)
151 approx_wcs = visit.fits_wcs
152 self.assertIsInstance(approx_wcs, astropy.wcs.WCS)
154 with self.assertRaises(TypeError):
155 # Requires a PSF.
156 VisitImage(
157 self.image,
158 mask_schema=self.mask_schema,
159 projection=self.projection,
160 obs_info=self.obs_info,
161 detector=self.detector,
162 band="r",
163 )
165 with self.assertRaises(TypeError):
166 # Requires ObservationInfo.
167 VisitImage(
168 self.image,
169 psf=self.gaussian_psf,
170 mask_schema=self.mask_schema,
171 projection=self.projection,
172 detector=self.detector,
173 band="r",
174 )
176 with self.assertRaises(TypeError):
177 # Requires a projection.
178 VisitImage(
179 self.image,
180 psf=self.gaussian_psf,
181 mask_schema=self.mask_schema,
182 obs_info=self.obs_info,
183 detector=self.detector,
184 band="r",
185 )
187 with self.assertRaises(TypeError):
188 # Requires a detector.
189 VisitImage(
190 self.image,
191 psf=self.gaussian_psf,
192 mask_schema=self.mask_schema,
193 projection=self.projection,
194 obs_info=self.obs_info,
195 band="r",
196 )
198 with self.assertRaises(TypeError):
199 # Requires some form of mask.
200 VisitImage(
201 self.image,
202 psf=self.gaussian_psf,
203 projection=self.projection,
204 obs_info=self.obs_info,
205 detector=self.detector,
206 band="r",
207 )
209 with self.assertRaises(TypeError):
210 VisitImage(
211 Image(42, shape=(5, 5)),
212 psf=self.gaussian_psf,
213 mask_schema=self.mask_schema,
214 projection=self.projection,
215 obs_info=self.obs_info,
216 detector=self.detector,
217 band="r",
218 )
220 # Requires a DetectorFrame.
221 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10])
222 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096])
223 with self.assertRaises(TypeError):
224 VisitImage(
225 self.image,
226 projection=tract_proj,
227 psf=self.gaussian_psf,
228 mask_schema=self.mask_schema,
229 obs_info=self.obs_info,
230 detector=self.detector,
231 band="r",
232 )
234 # Variance unit mismatch.
235 with self.assertRaises(ValueError):
236 VisitImage(
237 self.image,
238 variance=self.image,
239 psf=self.gaussian_psf,
240 mask_schema=self.mask_schema,
241 projection=self.projection,
242 obs_info=self.obs_info,
243 detector=self.detector,
244 band="r",
245 )
247 def test_copy_and_slice(self) -> None:
248 """Test that arrays and components are copied (when not immutable) by
249 'copy' and referenced by 'slice'.
250 """
251 visit = self.visit_image
252 copy = visit.copy()
253 copy.image.array[0, 0] = 30.0
254 self.assertEqual(visit.image.array[0, 0], 42.0)
255 self.assertEqual(copy.image.array[0, 0], 30.0)
256 subvisit = visit[Box.factory[0:5, 0:5]]
257 # Check summary stats.
258 self.assertEqual(copy.summary_stats, visit.summary_stats)
259 self.assertIsNot(copy.summary_stats, visit.summary_stats)
260 self.assertEqual(subvisit.summary_stats, visit.summary_stats)
261 self.assertIs(subvisit.summary_stats, visit.summary_stats)
262 # Check aperture corrections.
263 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys())
264 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections)
265 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys())
266 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections)
267 # Check backgrounds.
268 self.assertEqual(copy.backgrounds.keys(), visit.backgrounds.keys())
269 self.assertIsNot(copy.backgrounds, visit.backgrounds)
270 self.assertEqual(subvisit.backgrounds.keys(), visit.backgrounds.keys())
271 self.assertIs(subvisit.backgrounds, visit.backgrounds)
272 # Check bounds.
273 self.assertIs(copy.bounds, self.polygon)
274 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox
276 def test_obs_info(self) -> None:
277 """Check that ObservationInfo has been constructed."""
278 visit = self.visit_image
279 self.assertIsNotNone(visit.obs_info)
280 self.maxDiff = None
281 assert visit.obs_info is not None # for mypy.
282 self.assertEqual(visit.obs_info.instrument, "LSSTCam")
284 def test_summary_stats(self) -> None:
285 """Test the comparisons and attributes of ObservationSummaryStats."""
286 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4))
287 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5))
288 self.assertNotEqual(
289 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2))
290 )
292 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
293 def test_round_trip_ndf(self):
294 """NDF round-trip for VisitImage."""
295 with RoundtripNdf(self, self.visit_image, "VisitImage") as roundtrip:
296 assert_visit_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
298 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
299 def test_fits_ndf_consistency(self):
300 """FITS and NDF backends produce equal VisitImages on round-trip."""
301 with RoundtripFits(self, self.visit_image) as fits_rt, RoundtripNdf(self, self.visit_image) as ndf_rt:
302 assert_visit_images_equal(self, self.visit_image, fits_rt.result, expect_view=False)
303 assert_visit_images_equal(self, self.visit_image, ndf_rt.result, expect_view=False)
304 assert_visit_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False)
306 def test_fits_json_consistency(self):
307 """FITS and JSON backends produce equal VisitImages on round-trip."""
308 with (
309 RoundtripFits(self, self.visit_image) as fits_rt,
310 RoundtripJson(self, self.visit_image) as json_rt,
311 ):
312 assert_visit_images_equal(self, self.visit_image, fits_rt.result, expect_view=False)
313 assert_visit_images_equal(self, self.visit_image, json_rt.result, expect_view=False)
314 assert_visit_images_equal(self, fits_rt.result, json_rt.result, expect_view=False)
316 def test_read_write(self) -> None:
317 """Test that a visit can round trip through a FITS file."""
318 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
319 # Check that we're still using the right compression, and that we
320 # wrote WCSs.
321 fits = roundtrip.inspect()
322 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
323 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN")
324 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
325 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN")
326 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
327 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN")
328 # Check a subimage read.
329 subbox = Box.factory[8:13, 9:30]
330 subimage = roundtrip.get(bbox=subbox)
331 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
332 with self.subTest():
333 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
334 with self.subTest():
335 obs_info = roundtrip.get("obs_info")
336 self.assertIsInstance(obs_info, ObservationInfo)
337 self.assertEqual(obs_info, self.visit_image.obs_info)
338 with self.subTest():
339 summary_stats = roundtrip.get("summary_stats")
340 self.assertIsInstance(summary_stats, ObservationSummaryStats)
341 self.assertEqual(summary_stats, self.visit_image.summary_stats)
342 with self.subTest():
343 psf = roundtrip.get("psf")
344 self.assertIsInstance(psf, GaussianPointSpreadFunction)
345 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox)
346 with self.subTest():
347 backgrounds = roundtrip.get("backgrounds")
348 self.assertIsInstance(backgrounds, BackgroundMap)
349 self.assertEqual(backgrounds.keys(), {"standard"})
350 self.assertIsInstance(backgrounds["standard"].field, ChebyshevField)
351 self.assertEqual(backgrounds.subtracted.name, "standard")
352 self.assertEqual(
353 roundtrip.result.backgrounds.subtracted.description,
354 "Background subtracted from the image.",
355 )
357 assert_visit_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
358 # Check that the round-tripped headers are the same (up to card order).
359 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1)
360 self.assertEqual(
361 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
362 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
363 )
364 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
365 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
366 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
367 # Spot-check the concrete background contents (names, field types,
368 # subtracted entry) against the known fixture, so the equality check
369 # above is not vacuously satisfied by empty background maps.
370 self.assertIsInstance(roundtrip.result.backgrounds, BackgroundMap)
371 self.assertEqual(roundtrip.result.backgrounds.keys(), {"standard"})
372 self.assertIsInstance(roundtrip.result.backgrounds["standard"].field, ChebyshevField)
373 self.assertEqual(roundtrip.result.backgrounds.subtracted.name, "standard")
374 self.assertEqual(
375 roundtrip.result.backgrounds.subtracted.description, "Background subtracted from the image."
376 )
379class VisitImageLegacyTestMixin:
380 """Tests for the VisitImage class and the basics of the archive, to be
381 specialized for a particular test image.
383 `setUp` or `setUpClass` must be implemented to set the attributes declared
384 in the class.
385 """
387 filename: str
388 legacy_exposure: Any
389 plane_map: dict[str, MaskPlane]
390 visit_image: VisitImage
391 unit: u.UnitBase
393 def test_legacy_errors(self) -> None:
394 """Legacy read failure modes."""
395 with self.assertRaises(ValueError):
396 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC")
397 with self.assertRaises(ValueError):
398 VisitImage.from_legacy(self.legacy_exposure, visit=123456)
399 with self.assertRaises(ValueError):
400 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy)
401 visit = VisitImage.from_legacy(
402 self.legacy_exposure, instrument="LSSTCam", unit=self.unit, visit=2025052000177
403 )
404 self.assertEqual(visit.unit, self.unit)
406 with self.assertRaises(ValueError):
407 VisitImage.read_legacy(self.filename, instrument="HSC")
408 with self.assertRaises(ValueError):
409 VisitImage.read_legacy(self.filename, visit=123456)
411 def test_component_reads(self) -> None:
412 """Test reads of components from legacy file."""
413 visit = VisitImage.read_legacy(self.filename)
414 proj = VisitImage.read_legacy(self.filename, component="projection")
415 assert_projections_equal(self, proj, visit.projection, expect_identity=False)
416 image = VisitImage.read_legacy(self.filename, component="image")
417 self.assertEqual(image, visit.image)
418 assert_projections_equal(self, proj, image.projection, expect_identity=False)
419 variance = VisitImage.read_legacy(self.filename, component="variance")
420 self.assertEqual(variance, visit.variance)
421 assert_projections_equal(self, proj, variance.projection, expect_identity=False)
422 mask = VisitImage.read_legacy(self.filename, component="mask")
423 self.assertEqual(mask, visit.mask)
424 assert_projections_equal(self, proj, mask.projection, expect_identity=False)
425 psf = VisitImage.read_legacy(self.filename, component="psf")
426 self.assertIsInstance(psf, PointSpreadFunction)
427 obs_info = VisitImage.read_legacy(self.filename, component="obs_info")
428 self.check_legacy_obs_info(obs_info)
429 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats")
430 self.assertIsInstance(summary_stats, ObservationSummaryStats)
431 self.assertEqual(summary_stats.nPsfStar, self.legacy_exposure.info.getSummaryStats().nPsfStar)
432 compare_aperture_corrections_to_legacy(
433 self,
434 VisitImage.read_legacy(self.filename, component="aperture_corrections"),
435 self.legacy_exposure.info.getApCorrMap(),
436 visit.bbox,
437 )
438 detector = VisitImage.read_legacy(self.filename, component="detector")
439 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True)
440 photometric_scaling = VisitImage.read_legacy(self.filename, component="photometric_scaling")
441 compare_photo_calib_to_legacy(
442 self,
443 photometric_scaling,
444 self.legacy_exposure.getPhotoCalib(),
445 subimage_bbox=visit.bbox,
446 )
448 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None:
449 """Check that an `ObservationInfo` instance is not `None`, and that it
450 matches the one in the legacy test data file.
451 """
452 self.assertIsInstance(obs_info, ObservationInfo)
453 self.assertEqual(obs_info.instrument, "LSSTCam")
454 self.assertEqual(obs_info.detector_num, 85, obs_info)
455 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info)
456 self.assertEqual(obs_info.physical_filter, "r_57", obs_info)
458 def test_obs_info(self) -> None:
459 """Check that ObservationInfo has been constructed."""
460 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
461 self.assertIsNotNone(legacy.obs_info)
462 self.maxDiff = None
463 self.assertEqual(legacy.obs_info, self.visit_image.obs_info)
464 assert legacy.obs_info is not None # for mypy.
465 self.assertEqual(legacy.obs_info.instrument, "LSSTCam")
466 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info)
467 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info)
468 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info)
470 def test_aperture_corrections_to_legacy(self) -> None:
471 """Test that we can convert an aperture correction map back to a
472 legacy `lsst.afw.image.ApCorrMap`.
473 """
474 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections)
475 compare_aperture_corrections_to_legacy(
476 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox
477 )
479 def test_read_legacy_headers(self) -> None:
480 """Test that headers were correctly stripped and interpreted in
481 `VisitImage.read_legacy`.
482 """
483 # Check that we read the units from BUNIT.
484 self.assertEqual(self.visit_image.unit, self.unit)
485 # Check that the primary header has the keys we want, and none of the
486 # keys we don't want.
487 header = self.visit_image._opaque_metadata.headers[ExtensionKey()]
488 self.assertIn("EXPTIME", header)
489 self.assertEqual(header["PLATFORM"], "lsstcam")
490 self.assertNotIn("LSST BUTLER ID", header)
491 self.assertNotIn("AR HDU", header)
492 self.assertNotIn("A_ORDER", header)
493 # Check that the extension HDUs do not have any custom headers.
494 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
495 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
496 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
498 def test_from_legacy_headers(self) -> None:
499 """Test that from_legacy handles headers properly."""
500 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
501 header = legacy._opaque_metadata.headers[ExtensionKey()]
502 self.assertIn("EXPTIME", header)
503 self.assertEqual(header["PLATFORM"], "lsstcam")
504 self.assertNotIn("LSST BUTLER ID", header)
505 self.assertNotIn("AR HDU", header)
506 self.assertNotIn("A_ORDER", header)
507 # Check that the extension HDUs do not have any custom headers.
508 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
509 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
510 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
512 def test_rewrite(self) -> None:
513 """Test that we can rewrite the visit image and preserve both
514 lossy-compressed pixel values and components exactly.
515 """
516 import lsst.afw.image
518 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
519 # Check that we're still using the right compression, and that we
520 # wrote WCSs.
521 fits = roundtrip.inspect()
522 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
523 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP")
524 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
525 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP")
526 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
527 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP")
528 # Check a subimage read.
529 subbox = Box.factory[8:13, 9:30]
530 subimage = roundtrip.get(bbox=subbox)
531 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
532 alternates: dict[str, Any] = {}
533 with self.subTest():
534 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
535 alternates = {
536 k: roundtrip.get(k)
537 for k in [
538 "projection",
539 "image",
540 "mask",
541 "variance",
542 "psf",
543 "obs_info",
544 "summary_stats",
545 "aperture_corrections",
546 "detector",
547 "photometric_scaling",
548 ]
549 }
550 # Test reading back in as an Exposure.
551 with self.subTest():
552 legacy_exposure = roundtrip.get(storageClass="Exposure")
553 self.assertIsInstance(legacy_exposure, lsst.afw.image.Exposure)
554 # This covers most of the compnents, which have clean 1-1
555 # mappings from legacy to new:
556 compare_visit_image_to_legacy(
557 self,
558 self.visit_image,
559 legacy_exposure,
560 expect_view=False,
561 plane_map=self.plane_map,
562 **DP2_VISIT_DETECTOR_DATA_ID,
563 )
564 # A few components are different enough to merit extra
565 # attention:
566 if self.visit_image.unit == u.nJy:
567 self.assertTrue(legacy_exposure.getPhotoCalib()._isConstant)
568 self.assertEqual(legacy_exposure.getPhotoCalib().getCalibrationMean(), 1.0)
569 else:
570 compare_photo_calib_to_legacy(
571 self,
572 self.visit_image.photometric_scaling,
573 legacy_exposure.getPhotoCalib(),
574 subimage_bbox=subbox,
575 )
576 self.assertEqual(legacy_exposure.info.getId(), self.legacy_exposure.info.getId())
577 # Try to do a butler get of a component with storage class
578 # override.
579 with self.subTest():
580 # We have VisitInfo available.
581 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo")
582 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo)
583 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam")
585 assert_visit_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
586 # Check that the round-tripped headers are the same (up to card order).
587 self.assertEqual(
588 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
589 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
590 )
591 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
592 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
593 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
594 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam")
595 compare_visit_image_to_legacy(
596 self,
597 roundtrip.result,
598 self.legacy_exposure,
599 expect_view=False,
600 plane_map=self.plane_map,
601 **DP2_VISIT_DETECTOR_DATA_ID,
602 alternates=alternates,
603 )
604 # Check converting from the legacy object in-memory.
605 compare_visit_image_to_legacy(
606 self,
607 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map),
608 self.legacy_exposure,
609 expect_view=True,
610 plane_map=self.plane_map,
611 **DP2_VISIT_DETECTOR_DATA_ID,
612 )
614 def test_butler_converters(self) -> None:
615 """Test that we can read a VisitImage and its components from a butler
616 dataset written as an `lsst.afw.image.Exposure`.
617 """
618 if self.legacy_exposure is None:
619 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.")
620 with TemporaryButler(legacy="ExposureF") as helper:
621 from lsst.daf.butler import FileDataset
623 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink")
624 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage")
625 with warnings.catch_warnings():
626 # Silence warnings about data ID and filter label disagreeing.
627 warnings.simplefilter("ignore", category=UserWarning)
628 visit_image = helper.butler.get(visit_image_ref)
629 # We didn't ask for the quantization to be preserved, so it
630 # shouldn't be.
631 self.assertEqual(visit_image._opaque_metadata.precompressed.keys(), set())
632 # This time preserve the quantization
633 visit_image = helper.butler.get(visit_image_ref, parameters={"preserve_quantization": True})
634 self.assertEqual(visit_image._opaque_metadata.precompressed.keys(), {"IMAGE", "VARIANCE"})
635 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox"))
636 self.assertEqual(bbox, visit_image.bbox)
637 alternates = {
638 k: helper.butler.get(visit_image_ref.makeComponentRef(k))
639 # TODO: including "projection" or "obs_info" here fails because
640 # there's code in daf_butler that expects any component to be
641 # valid for the *internal* storage class, not the requested
642 # one, and that's difficult to fix because it's tied up with
643 # the data ID standardization logic.
644 for k in ["image", "mask", "variance", "bbox", "psf", "detector"]
645 }
646 compare_visit_image_to_legacy(
647 self,
648 visit_image,
649 self.legacy_exposure,
650 expect_view=False,
651 plane_map=self.plane_map,
652 alternates=alternates,
653 **DP2_VISIT_DETECTOR_DATA_ID,
654 )
655 # Add some metadata to the new VisitImage and then do a converting
656 # `put` that should write to the old format (we have to delete the
657 # old one first, which just deletes a symlink).
658 helper.butler.pruneDatasets([helper.legacy], purge=True, unstore=True, disassociate=True)
659 visit_image.metadata["MixedCaseKey"] = 52
660 helper.butler.put(visit_image, visit_image_ref)
661 # Check that we can read *that* back in as a legacy exposure.
662 legacy_exposure = helper.butler.get(helper.legacy)
663 compare_visit_image_to_legacy(
664 self,
665 visit_image,
666 legacy_exposure,
667 expect_view=False,
668 plane_map=self.plane_map,
669 alternates=alternates,
670 **DP2_VISIT_DETECTOR_DATA_ID,
671 )
672 # Check that we can read it back in as a VisitImage, and that the
673 # new metadata is preserved.
674 visit_image_2 = helper.butler.get(visit_image_ref)
675 compare_visit_image_to_legacy(
676 self,
677 visit_image_2,
678 legacy_exposure,
679 expect_view=False,
680 plane_map=self.plane_map,
681 alternates=alternates,
682 **DP2_VISIT_DETECTOR_DATA_ID,
683 )
684 self.assertEqual(visit_image_2.metadata["MixedCaseKey"], 52)
687@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
688class VisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin):
689 """Tests for the VisitImage class using a DRP-final visit_image dataset.
691 Requires legacy code.
692 """
694 @classmethod
695 def setUpClass(cls) -> None:
696 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
697 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits")
698 try:
699 from lsst.afw.image import ExposureFitsReader
701 cls.legacy_exposure = ExposureFitsReader(cls.filename).read()
702 except ImportError:
703 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None
704 cls.plane_map = get_legacy_visit_image_mask_planes()
705 cls.visit_image = VisitImage.read_legacy(
706 cls.filename, preserve_quantization=True, plane_map=cls.plane_map
707 )
708 cls.unit = u.nJy
710 def test_convert_unit(self) -> None:
711 """Test using the ``photometric_scaling`` to swap between
712 calibrated and instrumental units.
714 This includes tests of the `VisitImage.to_legacy` logic for units that
715 don't map directly to `lsst.afw.image.PhotoCalib` conventions.
716 """
717 from lsst.afw.table import ExposureCatalog
719 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
720 # Make a copy of class state so we can modify it without breaking
721 # other tests.
722 original = self.visit_image.copy()
723 # We should not be able to convert to instrumental units when there is
724 # no photometric scaling.
725 with self.assertRaises(u.UnitConversionError):
726 original.convert_unit(u.electron)
727 # Converting to the current unit should be a no-op that does not need
728 # to copy.
729 visit_image_nJy = original.convert_unit(u.nJy, copy=False)
730 self.assertTrue(np.may_share_memory(visit_image_nJy.image.array, original.image.array))
731 self.assertTrue(np.may_share_memory(visit_image_nJy.variance.array, original.variance.array))
732 # Even without a photometric_scaling attached, we should be able to
733 # convert to a compatible unit, but only if we allow copies.
734 with self.assertRaises(u.UnitConversionError):
735 original.convert_unit(u.mJy, copy=False)
736 visit_image_mJy = original.convert_unit(u.mJy, copy="as-needed")
737 self.assertEqual(visit_image_mJy.unit, u.mJy)
738 assert_close(self, visit_image_mJy.image.array, original.image.array * 1e-6)
739 self.assertTrue(np.may_share_memory(visit_image_nJy.mask.array, original.mask.array))
740 assert_close(self, visit_image_mJy.variance.array, original.variance.array * 1e-12)
741 # Converting a mJy image to legacy should make a PhotoCalib that maps
742 # mJy to nJy.
743 legacy_exposure_mJy = visit_image_mJy.to_legacy()
744 assert_close(self, legacy_exposure_mJy.getPhotoCalib().getCalibrationMean(), 1e6)
745 legacy_masked_image_nJy = legacy_exposure_mJy.getPhotoCalib().calibrateImage(
746 legacy_exposure_mJy.maskedImage
747 )
748 assert_close(self, visit_image_nJy.image.array, legacy_masked_image_nJy.image.array)
749 assert_close(self, visit_image_nJy.variance.array, legacy_masked_image_nJy.variance.array)
750 # Test that we haven't dropped any component objects along the way,
751 # and that they're all still the same objects or thin views.
752 self.assertTrue(np.may_share_memory(visit_image_mJy.mask.array, original.mask.array))
753 self.assertIs(visit_image_mJy.projection, original.projection)
754 self.assertIs(visit_image_mJy.obs_info, original.obs_info)
755 self.assertIs(visit_image_mJy.summary_stats, original.summary_stats)
756 self.assertIs(visit_image_mJy.psf, original.psf)
757 self.assertIs(visit_image_mJy.detector, original.detector)
758 self.assertIs(visit_image_mJy.bounds, original.bounds)
759 self.assertIs(visit_image_mJy.aperture_corrections, original.aperture_corrections)
760 self.assertIs(visit_image_mJy.photometric_scaling, original.photometric_scaling)
761 # Attach the final PhotoCalib (this isn't stored with the legacy file
762 # because that is the mapping to nJy, which is trivial).
763 visit_summary = ExposureCatalog.readFits(
764 os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_summary.fits")
765 )
766 legacy_photo_calib = visit_summary.find(DP2_VISIT_DETECTOR_DATA_ID["detector"]).getPhotoCalib()
767 visit_image_nJy.photometric_scaling = field_from_legacy_photo_calib(
768 legacy_photo_calib, bounds=original.detector.bbox, instrumental_unit=u.electron
769 )
770 compare_photo_calib_to_legacy(
771 self,
772 visit_image_nJy.photometric_scaling,
773 self.legacy_exposure.getPhotoCalib(),
774 applied_legacy_photo_calib=legacy_photo_calib,
775 subimage_bbox=visit_image_nJy.bbox,
776 )
777 # We still can't convert to completely unrelated units.
778 with self.assertRaises(u.UnitConversionError):
779 visit_image_nJy.convert_unit(u.mm)
780 # Uncalibrating via the photometric_scaling matches what legacy code
781 # does, and by default it copies everything.
782 with self.assertRaises(u.UnitConversionError):
783 visit_image_nJy.convert_unit(u.electron, copy=False)
784 legacy_masked_image_e = legacy_photo_calib.uncalibrateImage(self.legacy_exposure.maskedImage)
785 visit_image_e = visit_image_nJy.convert_unit(u.electron)
786 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array)
787 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array)
788 self.assertFalse(np.may_share_memory(visit_image_e.mask.array, visit_image_nJy.mask.array))
789 # We can also uncalibrate if we start with an image that has units
790 # that are compatible with the photometric_scaling but not identical
791 # to it.
792 visit_image_mJy.photometric_scaling = visit_image_nJy.photometric_scaling
793 visit_image_e = visit_image_mJy.convert_unit(u.electron)
794 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array)
795 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array)
796 # We can re-apply the scaling to go back to calibrated units.
797 visit_image_nJy_2 = visit_image_e.convert_unit(u.nJy)
798 assert_close(self, visit_image_nJy_2.image.array, visit_image_nJy.image.array)
799 assert_close(self, visit_image_nJy_2.variance.array, original.variance.array)
800 # Try calibrating an image with a scaling that has units other than
801 # nJy in the numerator.
802 visit_image_e.photometric_scaling = visit_image_nJy.photometric_scaling * (1e-6 * u.mJy / u.nJy)
803 visit_image_nJy_3 = visit_image_e.convert_unit(u.nJy)
804 assert_close(self, visit_image_nJy_3.image.array, visit_image_nJy.image.array)
805 assert_close(self, visit_image_nJy_3.variance.array, original.variance.array)
806 # Try converting that uncalibrated image to legacy; the extra mJy/nJy
807 # factor should get included in the PhotoCalib to recover the original
808 # PhotoCalib.
809 legacy_exposure_e = visit_image_e.to_legacy()
810 assert_close(
811 self,
812 legacy_exposure_e.getPhotoCalib().getCalibrationMean(),
813 legacy_photo_calib.getCalibrationMean(),
814 )
815 legacy_masked_image_nJy = legacy_exposure_e.getPhotoCalib().calibrateImage(
816 legacy_exposure_e.maskedImage
817 )
818 assert_close(self, visit_image_nJy.image.array, legacy_masked_image_nJy.image.array)
819 assert_close(self, visit_image_nJy.variance.array, legacy_masked_image_nJy.variance.array)
822@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
823class PreliminaryVisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin):
824 """Tests for the VisitImage class using a DRP preliminary_visit_image
825 dataset.
827 Requires legacy code.
828 """
830 @classmethod
831 def setUpClass(cls) -> None:
832 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
833 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "preliminary_visit_image.fits")
834 try:
835 from lsst.afw.image import ExposureFitsReader
837 cls.legacy_exposure = ExposureFitsReader(cls.filename).read()
838 except ImportError:
839 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None
840 cls.plane_map = get_legacy_visit_image_mask_planes()
841 cls.visit_image = VisitImage.read_legacy(
842 cls.filename, preserve_quantization=True, plane_map=cls.plane_map
843 )
844 cls.unit = u.electron
847if __name__ == "__main__":
848 unittest.main()