Coverage for tests / test_visit_image.py: 16%
371 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 08:47 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 08:47 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import unittest
16import warnings
17from typing import Any
19import astropy.io.fits
20import astropy.units as u
21import astropy.wcs
22import numpy as np
23from astro_metadata_translator import ObservationInfo
25from lsst.images import (
26 BackgroundMap,
27 Box,
28 DetectorFrame,
29 Image,
30 MaskPlane,
31 MaskSchema,
32 ObservationSummaryStats,
33 Polygon,
34 ProjectionAstropyView,
35 TractFrame,
36 VisitImage,
37 get_legacy_visit_image_mask_planes,
38)
39from lsst.images.aperture_corrections import ApertureCorrectionMap, aperture_corrections_to_legacy
40from lsst.images.cameras import Detector
41from lsst.images.fields import ChebyshevField, field_from_legacy_photo_calib
42from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata
43from lsst.images.json import read as read_json
44from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction
45from lsst.images.tests import (
46 DP2_VISIT_DETECTOR_DATA_ID,
47 RoundtripFits,
48 RoundtripNdf,
49 TemporaryButler,
50 assert_close,
51 assert_masked_images_equal,
52 assert_projections_equal,
53 compare_aperture_corrections_to_legacy,
54 compare_detector_to_legacy,
55 compare_photo_calib_to_legacy,
56 compare_visit_image_to_legacy,
57 make_random_projection,
58)
60try:
61 import h5py # noqa: F401
63 HAVE_H5PY = True
64except ImportError:
65 HAVE_H5PY = False
67EXTERNAL_DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
68LOCAL_DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
71class VisitImageTestCase(unittest.TestCase):
72 """Basic Tests for VisitImage."""
74 @classmethod
75 def setUpClass(cls) -> None:
76 cls.rng = np.random.default_rng(500)
77 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096])
78 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")])
79 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4)
80 cls.summary_stats = ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4)
81 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13])
82 cls.aperture_corrections: ApertureCorrectionMap = {
83 "flux1": ChebyshevField(det_frame.bbox, np.array([0.75])),
84 "flux2": ChebyshevField(det_frame.bbox, np.array([0.625])),
85 }
86 cls.detector, _, _ = read_json(Detector, os.path.join(LOCAL_DATA_DIR, "detector.json"))
88 opaque = FitsOpaqueMetadata()
89 hdr = astropy.io.fits.Header()
90 with warnings.catch_warnings():
91 # Silence warnings about long keys becoming HIERARCH.
92 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
93 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"})
94 opaque.extract_legacy_primary_header(hdr)
96 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy)
97 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy)
98 # polygon is the lower triangle of the image.
99 cls.polygon = Polygon(x_vertices=[-0.5, 1023.5, -0.5], y_vertices=[-0.5, -0.5, 1023.5])
100 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096])
101 # API signature suggests projection and obs_info can be None but they
102 # are required (unless you pass them in via the image plane).
103 cls.visit_image = VisitImage(
104 cls.image,
105 variance=cls.variance,
106 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
107 mask_schema=cls.mask_schema,
108 projection=cls.projection,
109 obs_info=cls.obs_info,
110 summary_stats=cls.summary_stats,
111 detector=cls.detector,
112 bounds=cls.polygon,
113 aperture_corrections=cls.aperture_corrections,
114 )
115 cls.visit_image.backgrounds.add(
116 "standard",
117 ChebyshevField(det_frame.bbox, np.array([[2.0]])),
118 description="Background subtracted from the image.",
119 is_subtracted=True,
120 )
121 cls.visit_image._opaque_metadata = opaque
122 cls.simplest_visit_image = VisitImage(
123 cls.image,
124 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
125 mask_schema=cls.mask_schema,
126 projection=cls.projection,
127 detector=cls.detector,
128 obs_info=cls.obs_info,
129 )
131 def test_basics(self) -> None:
132 """Test basic constructor patterns."""
133 # Test default fill of variance.
134 visit = self.simplest_visit_image
135 self.assertEqual(visit.variance.array[0, 0], 1.0)
136 self.assertIs(visit[...], visit)
137 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])")
138 self.assertEqual(
139 repr(visit),
140 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024)),"
141 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')],"
142 " dtype=dtype('uint8')))",
143 )
145 astropy_wcs = visit.astropy_wcs
146 self.assertIsInstance(astropy_wcs, ProjectionAstropyView)
147 approx_wcs = visit.fits_wcs
148 self.assertIsInstance(approx_wcs, astropy.wcs.WCS)
150 with self.assertRaises(TypeError):
151 # Requires a PSF.
152 VisitImage(
153 self.image,
154 mask_schema=self.mask_schema,
155 projection=self.projection,
156 obs_info=self.obs_info,
157 detector=self.detector,
158 )
160 with self.assertRaises(TypeError):
161 # Requires ObservationInfo.
162 VisitImage(
163 self.image,
164 psf=self.gaussian_psf,
165 mask_schema=self.mask_schema,
166 projection=self.projection,
167 detector=self.detector,
168 )
170 with self.assertRaises(TypeError):
171 # Requires a projection.
172 VisitImage(
173 self.image,
174 psf=self.gaussian_psf,
175 mask_schema=self.mask_schema,
176 obs_info=self.obs_info,
177 detector=self.detector,
178 )
180 with self.assertRaises(TypeError):
181 # Requires a detector.
182 VisitImage(
183 self.image,
184 psf=self.gaussian_psf,
185 mask_schema=self.mask_schema,
186 projection=self.projection,
187 obs_info=self.obs_info,
188 )
190 with self.assertRaises(TypeError):
191 # Requires some form of mask.
192 VisitImage(
193 self.image,
194 psf=self.gaussian_psf,
195 projection=self.projection,
196 obs_info=self.obs_info,
197 detector=self.detector,
198 )
200 with self.assertRaises(TypeError):
201 VisitImage(
202 Image(42, shape=(5, 5)),
203 psf=self.gaussian_psf,
204 mask_schema=self.mask_schema,
205 projection=self.projection,
206 obs_info=self.obs_info,
207 detector=self.detector,
208 )
210 # Requires a DetectorFrame.
211 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10])
212 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096])
213 with self.assertRaises(TypeError):
214 VisitImage(
215 self.image,
216 projection=tract_proj,
217 psf=self.gaussian_psf,
218 mask_schema=self.mask_schema,
219 obs_info=self.obs_info,
220 detector=self.detector,
221 )
223 # Variance unit mismatch.
224 with self.assertRaises(ValueError):
225 VisitImage(
226 self.image,
227 variance=self.image,
228 psf=self.gaussian_psf,
229 mask_schema=self.mask_schema,
230 projection=self.projection,
231 obs_info=self.obs_info,
232 detector=self.detector,
233 )
235 def test_copy_and_slice(self) -> None:
236 """Test that arrays and components are copied (when not immutable) by
237 'copy' and referenced by 'slice'.
238 """
239 visit = self.visit_image
240 copy = visit.copy()
241 copy.image.array[0, 0] = 30.0
242 self.assertEqual(visit.image.array[0, 0], 42.0)
243 self.assertEqual(copy.image.array[0, 0], 30.0)
244 subvisit = visit[Box.factory[0:5, 0:5]]
245 # Check summary stats.
246 self.assertEqual(copy.summary_stats, visit.summary_stats)
247 self.assertIsNot(copy.summary_stats, visit.summary_stats)
248 self.assertEqual(subvisit.summary_stats, visit.summary_stats)
249 self.assertIs(subvisit.summary_stats, visit.summary_stats)
250 # Check aperture corrections.
251 self.assertEqual(copy.aperture_corrections.keys(), visit.aperture_corrections.keys())
252 self.assertIsNot(copy.aperture_corrections, visit.aperture_corrections)
253 self.assertEqual(subvisit.aperture_corrections.keys(), visit.aperture_corrections.keys())
254 self.assertIs(subvisit.aperture_corrections, visit.aperture_corrections)
255 # Check backgrounds.
256 self.assertEqual(copy.backgrounds.keys(), visit.backgrounds.keys())
257 self.assertIsNot(copy.backgrounds, visit.backgrounds)
258 self.assertEqual(subvisit.backgrounds.keys(), visit.backgrounds.keys())
259 self.assertIs(subvisit.backgrounds, visit.backgrounds)
260 # Check bounds.
261 self.assertIs(copy.bounds, self.polygon)
262 self.assertEqual(subvisit.bounds, subvisit.bbox) # original polygon wholly encloses subvisit.bbox
264 def test_obs_info(self) -> None:
265 """Check that ObservationInfo has been constructed."""
266 visit = self.visit_image
267 self.assertIsNotNone(visit.obs_info)
268 self.maxDiff = None
269 assert visit.obs_info is not None # for mypy.
270 self.assertEqual(visit.obs_info.instrument, "LSSTCam")
272 def test_summary_stats(self) -> None:
273 """Test the comparisons and attributes of ObservationSummaryStats."""
274 self.assertEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5, zeroPoint=31.4))
275 self.assertNotEqual(self.summary_stats, ObservationSummaryStats(psfSigma=2.5))
276 self.assertNotEqual(
277 self.summary_stats, ObservationSummaryStats(psfSigma=2.5, raCorners=(5.2, 5.4, 5.4, 5.2))
278 )
280 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
281 def test_round_trip_ndf(self):
282 """NDF round-trip for VisitImage."""
283 with RoundtripNdf(self, self.visit_image) as roundtrip:
284 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
285 self.assertEqual(roundtrip.result.summary_stats, self.visit_image.summary_stats)
286 self.assertEqual(type(roundtrip.result.psf), type(self.visit_image.psf))
288 @unittest.skipUnless(HAVE_H5PY, "h5py is not installed")
289 def test_fits_ndf_consistency(self):
290 """FITS and NDF backends produce equal VisitImages on round-trip."""
291 with RoundtripFits(self, self.visit_image) as fits_rt, RoundtripNdf(self, self.visit_image) as ndf_rt:
292 assert_masked_images_equal(self, self.visit_image, fits_rt.result, expect_view=False)
293 assert_masked_images_equal(self, self.visit_image, ndf_rt.result, expect_view=False)
294 assert_masked_images_equal(self, fits_rt.result, ndf_rt.result, expect_view=False)
296 def test_read_write(self) -> None:
297 """Test that a visit can round trip through a FITS file."""
298 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
299 # Check that we're still using the right compression, and that we
300 # wrote WCSs.
301 fits = roundtrip.inspect()
302 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
303 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN")
304 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
305 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN")
306 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
307 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN")
308 # Check a subimage read.
309 subbox = Box.factory[8:13, 9:30]
310 subimage = roundtrip.get(bbox=subbox)
311 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
312 with self.subTest():
313 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
314 with self.subTest():
315 obs_info = roundtrip.get("obs_info")
316 self.assertIsInstance(obs_info, ObservationInfo)
317 self.assertEqual(obs_info, self.visit_image.obs_info)
318 with self.subTest():
319 summary_stats = roundtrip.get("summary_stats")
320 self.assertIsInstance(summary_stats, ObservationSummaryStats)
321 self.assertEqual(summary_stats, self.visit_image.summary_stats)
322 with self.subTest():
323 psf = roundtrip.get("psf")
324 self.assertIsInstance(psf, GaussianPointSpreadFunction)
325 self.assertEqual(psf.kernel_bbox, self.gaussian_psf.kernel_bbox)
326 with self.subTest():
327 backgrounds = roundtrip.get("backgrounds")
328 self.assertIsInstance(backgrounds, BackgroundMap)
329 self.assertEqual(backgrounds.keys(), {"standard"})
330 self.assertIsInstance(backgrounds["standard"].field, ChebyshevField)
331 self.assertEqual(backgrounds.subtracted.name, "standard")
332 self.assertEqual(
333 roundtrip.result.backgrounds.subtracted.description,
334 "Background subtracted from the image.",
335 )
337 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
338 # Check that the round-tripped headers are the same (up to card order).
339 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1)
340 self.assertEqual(
341 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
342 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
343 )
344 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
345 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
346 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
347 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info)
348 self.assertIsNotNone(roundtrip.result.summary_stats)
349 self.assertEqual(
350 roundtrip.result.summary_stats.psfSigma,
351 self.visit_image.summary_stats.psfSigma,
352 )
353 self.assertEqual(
354 roundtrip.result.summary_stats.zeroPoint,
355 self.visit_image.summary_stats.zeroPoint,
356 )
357 self.assertEqual(roundtrip.result.bounds, self.polygon)
358 self.assertIsInstance(roundtrip.result.backgrounds, BackgroundMap)
359 self.assertEqual(roundtrip.result.backgrounds.keys(), {"standard"})
360 self.assertIsInstance(roundtrip.result.backgrounds["standard"].field, ChebyshevField)
361 self.assertEqual(roundtrip.result.backgrounds.subtracted.name, "standard")
362 self.assertEqual(
363 roundtrip.result.backgrounds.subtracted.description, "Background subtracted from the image."
364 )
367class VisitImageLegacyTestMixin:
368 """Tests for the VisitImage class and the basics of the archive, to be
369 specialized for a particular test image.
371 `setUp` or `setUpClass` must be implemented to set the attributes declared
372 in the class.
373 """
375 filename: str
376 legacy_exposure: Any
377 plane_map: dict[str, MaskPlane]
378 visit_image: VisitImage
379 unit: u.UnitBase
381 def test_legacy_errors(self) -> None:
382 """Legacy read failure modes."""
383 with self.assertRaises(ValueError):
384 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC")
385 with self.assertRaises(ValueError):
386 VisitImage.from_legacy(self.legacy_exposure, visit=123456)
387 with self.assertRaises(ValueError):
388 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy)
389 visit = VisitImage.from_legacy(
390 self.legacy_exposure, instrument="LSSTCam", unit=self.unit, visit=2025052000177
391 )
392 self.assertEqual(visit.unit, self.unit)
394 with self.assertRaises(ValueError):
395 VisitImage.read_legacy(self.filename, instrument="HSC")
396 with self.assertRaises(ValueError):
397 VisitImage.read_legacy(self.filename, visit=123456)
399 def test_component_reads(self) -> None:
400 """Test reads of components from legacy file."""
401 visit = VisitImage.read_legacy(self.filename)
402 proj = VisitImage.read_legacy(self.filename, component="projection")
403 assert_projections_equal(self, proj, visit.projection, expect_identity=False)
404 image = VisitImage.read_legacy(self.filename, component="image")
405 self.assertEqual(image, visit.image)
406 assert_projections_equal(self, proj, image.projection, expect_identity=False)
407 variance = VisitImage.read_legacy(self.filename, component="variance")
408 self.assertEqual(variance, visit.variance)
409 assert_projections_equal(self, proj, variance.projection, expect_identity=False)
410 mask = VisitImage.read_legacy(self.filename, component="mask")
411 self.assertEqual(mask, visit.mask)
412 assert_projections_equal(self, proj, mask.projection, expect_identity=False)
413 psf = VisitImage.read_legacy(self.filename, component="psf")
414 self.assertIsInstance(psf, PointSpreadFunction)
415 obs_info = VisitImage.read_legacy(self.filename, component="obs_info")
416 self.check_legacy_obs_info(obs_info)
417 summary_stats = VisitImage.read_legacy(self.filename, component="summary_stats")
418 self.assertIsInstance(summary_stats, ObservationSummaryStats)
419 self.assertEqual(summary_stats.nPsfStar, self.legacy_exposure.info.getSummaryStats().nPsfStar)
420 compare_aperture_corrections_to_legacy(
421 self,
422 VisitImage.read_legacy(self.filename, component="aperture_corrections"),
423 self.legacy_exposure.info.getApCorrMap(),
424 visit.bbox,
425 )
426 detector = VisitImage.read_legacy(self.filename, component="detector")
427 compare_detector_to_legacy(self, detector, self.legacy_exposure.getDetector(), is_raw_assembled=True)
428 photometric_scaling = VisitImage.read_legacy(self.filename, component="photometric_scaling")
429 compare_photo_calib_to_legacy(
430 self,
431 photometric_scaling,
432 self.legacy_exposure.getPhotoCalib(),
433 subimage_bbox=visit.bbox,
434 )
436 def check_legacy_obs_info(self, obs_info: ObservationInfo | None) -> None:
437 """Check that an `ObservationInfo` instance is not `None`, and that it
438 matches the one in the legacy test data file.
439 """
440 self.assertIsInstance(obs_info, ObservationInfo)
441 self.assertEqual(obs_info.instrument, "LSSTCam")
442 self.assertEqual(obs_info.detector_num, 85, obs_info)
443 self.assertEqual(obs_info.detector_unique_name, "R21_S11", obs_info)
444 self.assertEqual(obs_info.physical_filter, "r_57", obs_info)
446 def test_obs_info(self) -> None:
447 """Check that ObservationInfo has been constructed."""
448 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
449 self.assertIsNotNone(legacy.obs_info)
450 self.maxDiff = None
451 self.assertEqual(legacy.obs_info, self.visit_image.obs_info)
452 assert legacy.obs_info is not None # for mypy.
453 self.assertEqual(legacy.obs_info.instrument, "LSSTCam")
454 self.assertEqual(legacy.obs_info.detector_num, 85, legacy.obs_info)
455 self.assertEqual(legacy.obs_info.detector_unique_name, "R21_S11", legacy.obs_info)
456 self.assertEqual(legacy.obs_info.physical_filter, "r_57", legacy.obs_info)
458 def test_aperture_corrections_to_legacy(self) -> None:
459 """Test that we can convert an aperture correction map back to a
460 legacy `lsst.afw.image.ApCorrMap`.
461 """
462 legacy_ap_corr_map = aperture_corrections_to_legacy(self.visit_image.aperture_corrections)
463 compare_aperture_corrections_to_legacy(
464 self, self.visit_image.aperture_corrections, legacy_ap_corr_map, self.visit_image.bbox
465 )
467 def test_read_legacy_headers(self) -> None:
468 """Test that headers were correctly stripped and interpreted in
469 `VisitImage.read_legacy`.
470 """
471 # Check that we read the units from BUNIT.
472 self.assertEqual(self.visit_image.unit, self.unit)
473 # Check that the primary header has the keys we want, and none of the
474 # keys we don't want.
475 header = self.visit_image._opaque_metadata.headers[ExtensionKey()]
476 self.assertIn("EXPTIME", header)
477 self.assertEqual(header["PLATFORM"], "lsstcam")
478 self.assertNotIn("LSST BUTLER ID", header)
479 self.assertNotIn("AR HDU", header)
480 self.assertNotIn("A_ORDER", header)
481 # Check that the extension HDUs do not have any custom headers.
482 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
483 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
484 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
486 def test_from_legacy_headers(self) -> None:
487 """Test that from_legacy handles headers properly."""
488 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
489 header = legacy._opaque_metadata.headers[ExtensionKey()]
490 self.assertIn("EXPTIME", header)
491 self.assertEqual(header["PLATFORM"], "lsstcam")
492 self.assertNotIn("LSST BUTLER ID", header)
493 self.assertNotIn("AR HDU", header)
494 self.assertNotIn("A_ORDER", header)
495 # Check that the extension HDUs do not have any custom headers.
496 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
497 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
498 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
500 def test_rewrite(self) -> None:
501 """Test that we can rewrite the visit image and preserve both
502 lossy-compressed pixel values and components exactly.
503 """
504 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
505 # Check that we're still using the right compression, and that we
506 # wrote WCSs.
507 fits = roundtrip.inspect()
508 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
509 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP")
510 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
511 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP")
512 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
513 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP")
514 # Check a subimage read.
515 subbox = Box.factory[8:13, 9:30]
516 subimage = roundtrip.get(bbox=subbox)
517 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
518 alternates: dict[str, Any] = {}
519 with self.subTest():
520 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
521 alternates = {
522 k: roundtrip.get(k)
523 for k in [
524 "projection",
525 "image",
526 "mask",
527 "variance",
528 "psf",
529 "obs_info",
530 "summary_stats",
531 "aperture_corrections",
532 "detector",
533 "photometric_scaling",
534 ]
535 }
536 # Try to do a butler get of a component with storage class
537 # override.
538 with self.subTest():
539 if self.legacy_exposure is not None:
540 import lsst.afw.image
542 # We have VisitInfo available.
543 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo")
544 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo)
545 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam")
546 else:
547 raise unittest.SkipTest("Can not test VisitInfo conversion without afw")
549 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
550 # Check that the round-tripped headers are the same (up to card order).
551 self.assertEqual(
552 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
553 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
554 )
555 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
556 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
557 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
558 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam")
559 compare_visit_image_to_legacy(
560 self,
561 roundtrip.result,
562 self.legacy_exposure,
563 expect_view=False,
564 plane_map=self.plane_map,
565 **DP2_VISIT_DETECTOR_DATA_ID,
566 alternates=alternates,
567 )
568 # Check converting from the legacy object in-memory.
569 compare_visit_image_to_legacy(
570 self,
571 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map),
572 self.legacy_exposure,
573 expect_view=True,
574 plane_map=self.plane_map,
575 **DP2_VISIT_DETECTOR_DATA_ID,
576 )
578 def test_butler_converters(self) -> None:
579 """Test that we can read a VisitImage and its components from a butler
580 dataset written as an `lsst.afw.image.Exposure`.
581 """
582 if self.legacy_exposure is None:
583 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.")
584 with TemporaryButler(legacy="ExposureF") as helper:
585 from lsst.daf.butler import FileDataset
587 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink")
588 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage")
589 visit_image = helper.butler.get(visit_image_ref)
590 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox"))
591 self.assertEqual(bbox, visit_image.bbox)
592 alternates = {
593 k: helper.butler.get(visit_image_ref.makeComponentRef(k))
594 # TODO: including "projection" or "obs_info" here fails because
595 # there's code in daf_butler that expects any component to be
596 # valid for the *internal* storage class, not the requested
597 # one, and that's difficult to fix because it's tied up with
598 # the data ID standardization logic.
599 for k in ["image", "mask", "variance", "psf", "detector"]
600 }
601 compare_visit_image_to_legacy(
602 self,
603 visit_image,
604 self.legacy_exposure,
605 expect_view=False,
606 plane_map=self.plane_map,
607 alternates=alternates,
608 **DP2_VISIT_DETECTOR_DATA_ID,
609 )
612@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
613class VisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin):
614 """Tests for the VisitImage class using a DRP-final visit_image dataset.
616 Requires legacy code.
617 """
619 @classmethod
620 def setUpClass(cls) -> None:
621 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
622 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_image.fits")
623 try:
624 from lsst.afw.image import ExposureFitsReader
626 cls.legacy_exposure = ExposureFitsReader(cls.filename).read()
627 except ImportError:
628 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None
629 cls.plane_map = get_legacy_visit_image_mask_planes()
630 cls.visit_image = VisitImage.read_legacy(
631 cls.filename, preserve_quantization=True, plane_map=cls.plane_map
632 )
633 cls.unit = u.nJy
635 def test_convert_unit(self) -> None:
636 """Test using the ``photometric_scaling`` to swap between
637 calibrated and instrumental units.
638 """
639 from lsst.afw.table import ExposureCatalog
641 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
642 # Make a copy of class state so we can modify it without breaking
643 # other tests.
644 original = self.visit_image.copy()
645 # We should not be able to convert to instrumental units when there is
646 # no photometric scaling.
647 with self.assertRaises(u.UnitConversionError):
648 original.convert_unit(u.electron)
649 # Converting to the current unit should be a no-op that does not need
650 # to copy.
651 visit_image_nJy = original.convert_unit(u.nJy, copy=False)
652 self.assertTrue(np.may_share_memory(visit_image_nJy.image.array, original.image.array))
653 self.assertTrue(np.may_share_memory(visit_image_nJy.variance.array, original.variance.array))
654 # Even without a photometric_scaling attached, we should be able to
655 # convert to a compatible unit, but only if we allow copies.
656 with self.assertRaises(u.UnitConversionError):
657 original.convert_unit(u.mJy, copy=False)
658 visit_image_mJy = original.convert_unit(u.mJy, copy="as-needed")
659 self.assertEqual(visit_image_mJy.unit, u.mJy)
660 assert_close(self, visit_image_mJy.image.array, original.image.array * 1e-6)
661 self.assertTrue(np.may_share_memory(visit_image_nJy.mask.array, original.mask.array))
662 assert_close(self, visit_image_mJy.variance.array, original.variance.array * 1e-12)
663 # Test that we haven't dropped any component objects along the way,
664 # and that they're all still the same objects or thin views.
665 self.assertTrue(np.may_share_memory(visit_image_mJy.mask.array, original.mask.array))
666 self.assertIs(visit_image_mJy.projection, original.projection)
667 self.assertIs(visit_image_mJy.obs_info, original.obs_info)
668 self.assertIs(visit_image_mJy.summary_stats, original.summary_stats)
669 self.assertIs(visit_image_mJy.psf, original.psf)
670 self.assertIs(visit_image_mJy.detector, original.detector)
671 self.assertIs(visit_image_mJy.bounds, original.bounds)
672 self.assertIs(visit_image_mJy.aperture_corrections, original.aperture_corrections)
673 self.assertIs(visit_image_mJy.photometric_scaling, original.photometric_scaling)
674 # Attach the final PhotoCalib (this isn't stored with the legacy file
675 # because that is the mapping to nJy, which is trivial).
676 visit_summary = ExposureCatalog.readFits(
677 os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "visit_summary.fits")
678 )
679 legacy_photo_calib = visit_summary.find(DP2_VISIT_DETECTOR_DATA_ID["detector"]).getPhotoCalib()
680 visit_image_nJy.photometric_scaling = field_from_legacy_photo_calib(
681 legacy_photo_calib, bounds=original.detector.bbox, instrumental_unit=u.electron
682 )
683 compare_photo_calib_to_legacy(
684 self,
685 visit_image_nJy.photometric_scaling,
686 self.legacy_exposure.getPhotoCalib(),
687 applied_legacy_photo_calib=legacy_photo_calib,
688 subimage_bbox=visit_image_nJy.bbox,
689 )
690 # We still can't convert to completely unrelated units.
691 with self.assertRaises(u.UnitConversionError):
692 visit_image_nJy.convert_unit(u.mm)
693 # Uncalibrating via the photometric_scaling matches what legacy code
694 # does, and by default it copies everything.
695 with self.assertRaises(u.UnitConversionError):
696 visit_image_nJy.convert_unit(u.electron, copy=False)
697 legacy_masked_image_e = legacy_photo_calib.uncalibrateImage(self.legacy_exposure.maskedImage)
698 visit_image_e = visit_image_nJy.convert_unit(u.electron)
699 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array)
700 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array)
701 self.assertFalse(np.may_share_memory(visit_image_e.mask.array, visit_image_nJy.mask.array))
702 # We can also uncalibrate if we start with an image that has units
703 # that are compatible with the photometric_scaling but not identical
704 # to it.
705 visit_image_mJy.photometric_scaling = visit_image_nJy.photometric_scaling
706 visit_image_e = visit_image_mJy.convert_unit(u.electron)
707 assert_close(self, visit_image_e.image.array, legacy_masked_image_e.image.array)
708 assert_close(self, visit_image_e.variance.array, legacy_masked_image_e.variance.array)
709 # We can re-apply the scaling go go back to calibrated units.
710 visit_image_nJy_2 = visit_image_e.convert_unit(u.nJy)
711 assert_close(self, visit_image_nJy_2.image.array, visit_image_nJy.image.array)
712 assert_close(self, visit_image_nJy_2.variance.array, original.variance.array)
715@unittest.skipUnless(EXTERNAL_DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
716class PreliminaryVisitImageLegacyTestCase(unittest.TestCase, VisitImageLegacyTestMixin):
717 """Tests for the VisitImage class using a DRP preliminary_visit_image
718 dataset.
720 Requires legacy code.
721 """
723 @classmethod
724 def setUpClass(cls) -> None:
725 assert EXTERNAL_DATA_DIR is not None, "Guaranteed by decorator."
726 cls.filename = os.path.join(EXTERNAL_DATA_DIR, "dp2", "legacy", "preliminary_visit_image.fits")
727 try:
728 from lsst.afw.image import ExposureFitsReader
730 cls.legacy_exposure = ExposureFitsReader(cls.filename).read()
731 except ImportError:
732 raise unittest.SkipTest("afw not available; cannot read legacy visit images") from None
733 cls.plane_map = get_legacy_visit_image_mask_planes()
734 cls.visit_image = VisitImage.read_legacy(
735 cls.filename, preserve_quantization=True, plane_map=cls.plane_map
736 )
737 cls.unit = u.electron
740if __name__ == "__main__":
741 unittest.main()