Coverage for tests/test_coadds.py: 18%
278 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:01 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-03 08:01 +0000
1# This file is part of cell_coadds.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import unittest
23from collections.abc import Iterable, Mapping
24from itertools import product
26import numpy as np
27from frozendict import frozendict
29import lsst.cell_coadds.test_utils as test_utils
30import lsst.geom as geom
31import lsst.meas.base.tests
32import lsst.utils.tests
33from lsst.afw.geom import Polygon, Quadrupole
34from lsst.afw.image import ExposureF, ImageF
35from lsst.cell_coadds import (
36 CellCoaddFitsReader,
37 CellIdentifiers,
38 CoaddInputs,
39 CoaddUnits,
40 CommonComponents,
41 ExplodedCoadd,
42 MultipleCellCoadd,
43 ObservationIdentifiers,
44 OwnedImagePlanes,
45 PatchIdentifiers,
46 SingleCellCoadd,
47 StitchedCoadd,
48 UniformGrid,
49)
50from lsst.meas.algorithms import SingleGaussianPsf
51from lsst.skymap import Index2D
54class BaseMultipleCellCoaddTestCase(lsst.utils.tests.TestCase):
55 """A base class that provides a common set of methods."""
57 psf_size: int
58 psf_sigmas: Mapping[Index2D, float]
59 border_size: int
60 inner_size: int
61 outer_size: int
62 test_positions: Iterable[tuple[geom.Point2D, Index2D]]
63 exposures: Mapping[Index2D, ExposureF]
64 multiple_cell_coadd: MultipleCellCoadd
66 @classmethod
67 def setUpClass(cls) -> None:
68 """Set up a multiple cell coadd with 2x2 cells."""
69 np.random.seed(42)
71 cls.nx, cls.ny = 3, 2
72 cls.psf_sigmas = {
73 Index2D(x=0, y=0): 1.2,
74 Index2D(x=0, y=1): 0.7,
75 Index2D(x=1, y=0): 0.9,
76 Index2D(x=1, y=1): 1.1,
77 Index2D(x=2, y=0): 1.3,
78 Index2D(x=2, y=1): 0.8,
79 }
81 cls.border_size = 5
82 # In practice, we expect this to be squares.
83 # To check for any possible x, y swap, we use different values.
84 cls.psf_size_x, cls.psf_size_y = 21, 23
85 cls.inner_size_x, cls.inner_size_y = 17, 15
86 cls.outer_size_x = cls.inner_size_x + 2 * cls.border_size
87 cls.outer_size_y = cls.inner_size_y + 2 * cls.border_size
88 # The origin should not be at (0, 0) for robust testing.
89 cls.x0, cls.y0 = 5, 2
90 cls.n_noise_realizations = 2
92 patch_outer_bbox = geom.Box2I(
93 geom.Point2I(cls.x0, cls.y0), geom.Extent2I(cls.nx * cls.inner_size_x, cls.ny * cls.inner_size_y)
94 )
95 patch_outer_bbox.grow(cls.border_size)
97 # Add one star and one galaxy per quadrant.
98 # The mapping of positions to cell indices assume inner_size = (17, 15)
99 # and border_size = 5. If that is changed, these values need an update.
100 sources = (
101 # flux, centroid, shape
102 (1000.0, geom.Point2D(cls.x0 + 6.3, cls.y0 + 7.2), None),
103 (2500.0, geom.Point2D(cls.x0 + 16.8, cls.y0 + 18.3), None),
104 (1500.0, geom.Point2D(cls.x0 + 21.2, cls.y0 + 5.1), None),
105 (3200.0, geom.Point2D(cls.x0 + 16.1, cls.y0 + 23.9), None),
106 (1800.0, geom.Point2D(cls.x0 + 44.7, cls.y0 + 8.9), None),
107 (2100.0, geom.Point2D(cls.x0 + 34.1, cls.y0 + 19.2), None),
108 (900.0, geom.Point2D(cls.x0 + 9.1, cls.y0 + 13.9), Quadrupole(2.5, 1.5, 0.8)),
109 (1250.0, geom.Point2D(cls.x0 + 19.3, cls.y0 + 11.2), Quadrupole(1.5, 2.5, 0.75)),
110 (2100.0, geom.Point2D(cls.x0 + 5.1, cls.y0 + 21.2), Quadrupole(1.7, 1.9, 0.05)),
111 (2800.0, geom.Point2D(cls.x0 + 24.1, cls.y0 + 19.2), Quadrupole(1.9, 1.7, 0.1)),
112 (2350.0, geom.Point2D(cls.x0 + 40.3, cls.y0 + 13.9), Quadrupole(1.8, 1.8, -0.4)),
113 (4999.0, geom.Point2D(cls.x0 + 45.8, cls.y0 + 22.0), Quadrupole(1.6, 1.2, 0.2)),
114 )
116 # The test points are chosen to cover various corner cases assuming
117 # inner_size = (17, 15) and border_size = 5. If that is changed, the
118 # test points should be updated to not fall outside the coadd and still
119 # cover the description in the inline comments. This is checked below.
120 test_points = (
121 geom.Point2D(cls.x0 + 5, cls.y0 + 4), # inner point in lower left
122 geom.Point2D(cls.x0 + 6, cls.y0 + 24), # inner point in upper left
123 geom.Point2D(cls.x0 + 25.2, cls.y0 + 7.8), # inner point in lower middle
124 geom.Point2D(cls.x0 + 23, cls.y0 + 22), # inner point in upper middle
125 geom.Point2D(cls.x0 + 39, cls.y0 + 9.4), # inner point in lower right
126 geom.Point2D(cls.x0 + 44, cls.y0 + 24), # inner point in upper right
127 # Some points that lie on the border
128 geom.Point2D(cls.x0 + 33, cls.y0 + 24), # inner point in upper right
129 geom.Point2D(cls.x0 + 46, cls.y0 + 0), # inner point in lower right
130 geom.Point2D(cls.x0 + 19, cls.y0 + 16), # inner point in upper middle
131 geom.Point2D(cls.x0 + 17, cls.y0 + 8), # inner point in lower middle
132 geom.Point2D(cls.x0 + 0, cls.y0 + 29), # inner point in upper left
133 geom.Point2D(cls.x0 + 0, cls.y0 + 0), # inner point in lower left
134 )
135 # A list of (point, cell_index) pairs.
136 cls.test_positions = [
137 (
138 point,
139 Index2D(
140 x=int((point.getX() - cls.x0) // cls.inner_size_x),
141 y=int((point.getY() - cls.y0) // cls.inner_size_y),
142 ),
143 )
144 for point in test_points
145 ]
147 schema = lsst.meas.base.tests.TestDataset.makeMinimalSchema()
149 single_cell_coadds = []
150 cls.exposures = dict.fromkeys(cls.psf_sigmas.keys())
152 data_id = test_utils.generate_data_id()
153 common = CommonComponents(
154 units=CoaddUnits.nJy, # units here are arbitrary.
155 wcs=test_utils.generate_wcs(),
156 band=data_id["band"],
157 identifiers=PatchIdentifiers.from_data_id(data_id),
158 visit_polygons={ # This is arbitrary, just to check it if it goes through FITS persistence.
159 ObservationIdentifiers(
160 instrument="dummy",
161 physical_filter="dummy-I",
162 visit=12345,
163 detector=67,
164 day_obs=20000101,
165 ): Polygon(geom.Box2D(patch_outer_bbox))
166 },
167 )
169 for x in range(cls.nx):
170 for y in range(cls.ny):
171 identifiers = CellIdentifiers(
172 cell=Index2D(x=x, y=y),
173 skymap=common.identifiers.skymap,
174 tract=common.identifiers.tract,
175 patch=common.identifiers.patch,
176 band=common.identifiers.band,
177 )
179 outer_bbox = geom.Box2I(
180 geom.Point2I(cls.x0 + x * cls.inner_size_x, cls.y0 + y * cls.inner_size_y),
181 geom.Extent2I(cls.inner_size_x, cls.inner_size_y),
182 )
183 outer_bbox.grow(cls.border_size)
185 dataset = lsst.meas.base.tests.TestDataset(
186 patch_outer_bbox, psfSigma=cls.psf_sigmas[identifiers.cell]
187 )
189 for inst_flux, position, shape in sources:
190 dataset.addSource(inst_flux, position, shape)
192 # Create a spatially varying variance plane.
193 variance = ImageF(
194 # np.random.uniform returns an array with x-y flipped.
195 np.random.uniform(
196 0.8,
197 1.2,
198 (
199 cls.ny * cls.inner_size_y + 2 * cls.border_size,
200 cls.nx * cls.inner_size_x + 2 * cls.border_size,
201 ),
202 ).astype(np.float32),
203 xy0=outer_bbox.getMin(),
204 )
205 noise_realizations = [
206 ImageF(
207 np.random.normal(
208 0.0,
209 1.0,
210 (
211 cls.ny * cls.inner_size_y + 2 * cls.border_size,
212 cls.nx * cls.inner_size_x + 2 * cls.border_size,
213 ),
214 ).astype(np.float32)
215 * variance.array,
216 xy0=outer_bbox.getMin(),
217 )
218 for _ in range(cls.n_noise_realizations)
219 ]
220 mask_fraction = ImageF(
221 np.random.uniform(
222 0.0,
223 1.0,
224 (
225 cls.ny * cls.inner_size_y + 2 * cls.border_size,
226 cls.nx * cls.inner_size_x + 2 * cls.border_size,
227 ),
228 ).astype(np.float32),
229 xy0=outer_bbox.getMin(),
230 )
231 exposure, _ = dataset.realize(variance.getArray() ** 0.5, schema, randomSeed=123456789)
232 cls.exposures[identifiers.cell] = exposure
233 exposure = exposure[outer_bbox]
234 image_plane = OwnedImagePlanes(
235 image=exposure.image,
236 variance=exposure.variance,
237 mask=exposure.mask,
238 noise_realizations=noise_realizations,
239 mask_fractions=mask_fraction,
240 )
241 aperture_correction_map = frozendict(
242 base_GaussianFlux_instFlux=0.9 * (x + 1),
243 base_GaussianFlux_instFluxErr=0.01 * (y + 1),
244 base_PsfFlux_instFlux=0.8 * (y + 2),
245 base_PsfFlux_instFluxErr=0.02 * (x + 2),
246 )
248 single_cell_coadds.append(
249 SingleCellCoadd(
250 outer=image_plane,
251 psf=SingleGaussianPsf(
252 cls.psf_size_x, cls.psf_size_y, cls.psf_sigmas[Index2D(x=x, y=y)]
253 ).computeKernelImage(outer_bbox.getCenter()),
254 inner_bbox=geom.Box2I(
255 geom.Point2I(cls.x0 + x * cls.inner_size_x, cls.y0 + y * cls.inner_size_y),
256 geom.Extent2I(cls.inner_size_x, cls.inner_size_y),
257 ),
258 inputs={
259 ObservationIdentifiers(
260 instrument="dummy",
261 physical_filter="dummy-I",
262 visit=12345,
263 detector=67,
264 day_obs=20000101,
265 ): CoaddInputs(
266 overlaps_center=True,
267 overlap_fraction=1.0,
268 unmasked_overlap_fraction=1.0,
269 weight=1.0,
270 psf_shape=Quadrupole(
271 cls.psf_sigmas[Index2D(x=x, y=y)] ** 2,
272 cls.psf_sigmas[Index2D(x=x, y=y)] ** 2,
273 ),
274 psf_shape_flag=False,
275 )
276 },
277 common=common,
278 identifiers=identifiers,
279 aperture_correction_map=aperture_correction_map,
280 )
281 )
283 grid_bbox = geom.Box2I(
284 geom.Point2I(cls.x0, cls.y0), geom.Extent2I(cls.nx * cls.inner_size_x, cls.ny * cls.inner_size_y)
285 )
286 grid = UniformGrid.from_bbox_shape(grid_bbox, Index2D(x=cls.nx, y=cls.ny), padding=cls.border_size)
288 for test_point, test_index in cls.test_positions:
289 assert test_index == grid.index(
290 geom.Point2I(test_point)
291 ), f"Test point {test_point} is not in cell {test_index}."
293 cls.multiple_cell_coadd = MultipleCellCoadd(
294 single_cell_coadds,
295 grid=grid,
296 outer_cell_size=geom.Extent2I(cls.outer_size_x, cls.outer_size_y),
297 inner_bbox=None,
298 common=common,
299 psf_image_size=geom.Extent2I(cls.psf_size_x, cls.psf_size_y),
300 )
302 @classmethod
303 def tearDownClass(cls) -> None: # noqa: D102
304 # Docstring inherited
305 del cls.multiple_cell_coadd
306 del cls.exposures
307 super().tearDownClass()
309 def assertMultipleCellCoaddsEqual(self, mcc1: MultipleCellCoadd, mcc2: MultipleCellCoadd) -> None:
310 """Check the equality of two instances of `MultipleCellCoadd`.
312 Parameters
313 ----------
314 mcc1 : `MultipleCellCoadd`
315 The MultipleCellCoadd created by reading a FITS file.
316 mcc2 : `MultipleCellCoadd`
317 The reference MultipleCellCoadd for comparison.
318 """
319 self.assertEqual(mcc1.band, mcc2.band)
320 self.assertEqual(mcc1.identifiers, mcc2.identifiers)
321 self.assertEqual(mcc1.inner_bbox, mcc2.inner_bbox)
322 self.assertEqual(mcc1.outer_bbox, mcc2.outer_bbox)
323 self.assertEqual(mcc1.outer_cell_size, mcc2.outer_cell_size)
324 self.assertEqual(mcc1.grid, mcc2.grid)
325 self.assertEqual(mcc1.mask_fraction_names, mcc2.mask_fraction_names)
326 self.assertEqual(mcc1.n_noise_realizations, mcc2.n_noise_realizations)
327 self.assertEqual(mcc1.psf_image_size, mcc2.psf_image_size)
328 self.assertEqual(mcc1.units, mcc2.units)
329 self.assertEqual(mcc1.wcs.getFitsMetadata().toString(), mcc2.wcs.getFitsMetadata().toString())
331 # Check that the individual cells are identical.
332 self.assertEqual(mcc1.cells.keys(), mcc2.cells.keys())
333 for idx in mcc1.cells.keys(): # noqa: SIM118
334 self.assertImagesEqual(mcc1.cells[idx].outer.image, mcc2.cells[idx].outer.image)
335 self.assertMasksEqual(mcc1.cells[idx].outer.mask, mcc2.cells[idx].outer.mask)
336 self.assertImagesEqual(mcc1.cells[idx].outer.variance, mcc2.cells[idx].outer.variance)
337 self.assertImagesEqual(mcc1.cells[idx].outer.mask_fractions, mcc2.cells[idx].outer.mask_fractions)
338 self.assertEqual(
339 len(mcc1.cells[idx].outer.noise_realizations), len(mcc2.cells[idx].outer.noise_realizations)
340 )
341 for noise_id in range(len(mcc1.cells[idx].outer.noise_realizations)):
342 with self.subTest(noise_id):
343 self.assertImagesEqual(
344 mcc1.cells[idx].outer.noise_realizations[noise_id],
345 mcc2.cells[idx].outer.noise_realizations[noise_id],
346 )
347 self.assertImagesEqual(mcc1.cells[idx].psf_image, mcc2.cells[idx].psf_image)
348 self.assertEqual(mcc1.cells[idx].inputs, mcc2.cells[idx].inputs)
350 self.assertEqual(mcc1.cells[idx].band, mcc1.band)
351 self.assertEqual(mcc1.cells[idx].common, mcc1.common)
352 self.assertEqual(mcc1.cells[idx].units, mcc2.units)
353 self.assertEqual(mcc1.cells[idx].wcs, mcc1.wcs)
354 self.assertEqual(mcc1.cells[idx].aperture_correction_map, mcc2.cells[idx].aperture_correction_map)
356 # Identifiers differ because of the ``cell`` component.
357 # Check the other attributes within the identifiers.
358 for attr in ("skymap", "tract", "patch", "band"):
359 self.assertEqual(getattr(mcc1.cells[idx].identifiers, attr), getattr(mcc1.identifiers, attr))
362class MultipleCellCoaddTestCase(BaseMultipleCellCoaddTestCase):
363 """Test the construction and interfaces of MultipleCellCoadd."""
365 def test_fits(self):
366 """Test that we can write a coadd to a FITS file and read it."""
367 with lsst.utils.tests.getTempFilePath(".fits") as filename:
368 self.multiple_cell_coadd.write_fits(filename)
369 mcc1 = MultipleCellCoadd.read_fits(filename) # Test the readFits method.
371 # Test the reader class.
372 reader = CellCoaddFitsReader(filename)
373 mcc2 = reader.readAsMultipleCellCoadd()
375 wcs = reader.readWcs()
377 self.assertMultipleCellCoaddsEqual(mcc1, self.multiple_cell_coadd)
378 self.assertMultipleCellCoaddsEqual(mcc2, self.multiple_cell_coadd)
379 # By transititve property of equality, mcc1 == mcc2.
381 self.assertEqual(self.multiple_cell_coadd.band, self.multiple_cell_coadd.common.band)
382 for poly in self.multiple_cell_coadd.common.visit_polygons.values():
383 self.assertEqual(len(poly), 4)
384 self.assertEqual(
385 wcs.getFitsMetadata().toString(), self.multiple_cell_coadd.wcs.getFitsMetadata().toString()
386 )
388 def test_visit_count(self):
389 """Test the visit_count method."""
390 # Since we don't simulate coaddition from multiple warps, the cells are
391 # all going to have just a single visit.
392 for cellId, singleCellCoadd in self.multiple_cell_coadd.cells.items():
393 with self.subTest(x=cellId.x, y=cellId.y):
394 self.assertEqual(singleCellCoadd.visit_count, 1)
396 def test_aperture_correction(self):
397 """Test the aperture correction values are what we expect."""
398 for cellId, single_cell_coadd in self.multiple_cell_coadd.cells.items():
399 with self.subTest(x=cellId.x, y=cellId.y):
400 ap_corr_map = single_cell_coadd.aperture_correction_map
401 self.assertIsNotNone(ap_corr_map)
402 self.assertEqual(ap_corr_map["base_GaussianFlux_instFlux"], 0.9 * (cellId.x + 1))
403 self.assertEqual(ap_corr_map["base_GaussianFlux_instFluxErr"], 0.01 * (cellId.y + 1))
404 self.assertEqual(ap_corr_map["base_PsfFlux_instFlux"], 0.8 * (cellId.y + 2))
405 self.assertEqual(ap_corr_map["base_PsfFlux_instFluxErr"], 0.02 * (cellId.x + 2))
408class ExplodedCoaddTestCase(BaseMultipleCellCoaddTestCase):
409 """Test the construction and methods of an ExplodedCoadd instance."""
411 exploded_coadd: ExplodedCoadd
413 @classmethod
414 def setUpClass(cls) -> None: # noqa: D102
415 # Docstring inherited
416 super().setUpClass()
417 cls.exploded_coadd = cls.multiple_cell_coadd.explode()
419 @classmethod
420 def tearDownClass(cls) -> None: # noqa: D102
421 # Docstring inherited
422 del cls.exploded_coadd
423 super().tearDownClass()
425 def test_exploded_psf_image(self):
426 """Show that psf_image sizes are absurd."""
427 self.assertEqual(
428 self.exploded_coadd.psf_image.getBBox().getDimensions(),
429 geom.Extent2I(self.nx * self.psf_size_x, self.ny * self.psf_size_y),
430 )
431 for pad_psfs_with in (-999, -4, 0, 4, 8, 21, 40, 100):
432 exploded_coadd = self.multiple_cell_coadd.explode(pad_psfs_with=pad_psfs_with)
433 self.assertEqual(
434 exploded_coadd.psf_image.getBBox().getDimensions(),
435 geom.Extent2I(self.nx * self.outer_size_x, self.ny * self.outer_size_y),
436 )
438 def test_asMaskedImage(self):
439 """Test the asMaskedImage method for an ExplodedCoadd object."""
440 masked_image = self.exploded_coadd.asMaskedImage()
441 masked_image.setXY0(self.multiple_cell_coadd.outer_bbox.getMin())
442 base_bbox = self.multiple_cell_coadd.cells.first.outer.bbox
443 for cell_x, cell_y in product(range(self.nx), range(self.ny)):
444 bbox = base_bbox.shiftedBy(geom.Extent2I(cell_x * self.outer_size_x, cell_y * self.outer_size_y))
445 with self.subTest(cell_x=cell_x, cell_y=cell_y):
446 self.assertMaskedImagesEqual(
447 masked_image[bbox],
448 self.multiple_cell_coadd.cells[Index2D(cell_x, cell_y)].outer.asMaskedImage(),
449 )
452class StitchedCoaddTestCase(BaseMultipleCellCoaddTestCase):
453 """Test the construction and methods of a StitchedCoadd instance."""
455 stitched_coadd: StitchedCoadd
457 @classmethod
458 def setUpClass(cls) -> None: # noqa: D102
459 # Docstring inherited
460 super().setUpClass()
461 cls.stitched_coadd = cls.multiple_cell_coadd.stitch()
463 @classmethod
464 def tearDownClass(cls) -> None: # noqa: D102
465 # Docstring inherited
466 del cls.stitched_coadd
467 super().tearDownClass()
469 def test_computeBBox(self):
470 """Test the computeBBox method for a StitchedPsf object."""
471 stitched_psf = self.stitched_coadd.psf
473 psf_bbox = geom.Box2I(
474 geom.Point2I(-(self.psf_size_x // 2), -(self.psf_size_y // 2)),
475 geom.Extent2I(self.psf_size_x, self.psf_size_y),
476 )
478 for position, _ in self.test_positions:
479 bbox = stitched_psf.computeBBox(position)
480 self.assertEqual(bbox, psf_bbox)
482 def test_computeShape(self):
483 """Test the computeShape method for a StitchedPsf object."""
484 stitched_psf = self.stitched_coadd.psf
485 for position, cell_index in self.test_positions:
486 psf_shape = stitched_psf.computeShape(position) # check we can compute shape
487 self.assertIsNot(psf_shape.getIxx(), np.nan)
488 self.assertIsNot(psf_shape.getIyy(), np.nan)
489 self.assertIsNot(psf_shape.getIxy(), np.nan)
491 # Moments measured from pixellated images are significantly
492 # underestimated for small PSFs.
493 if self.psf_sigmas[cell_index] >= 1.0:
494 self.assertAlmostEqual(psf_shape.getIxx(), self.psf_sigmas[cell_index] ** 2, delta=1e-3)
495 self.assertAlmostEqual(psf_shape.getIyy(), self.psf_sigmas[cell_index] ** 2, delta=1e-3)
496 self.assertAlmostEqual(psf_shape.getIxy(), 0.0)
498 def test_computeKernelImage(self):
499 """Test the computeKernelImage method for a StitchedPsf object."""
500 stitched_psf = self.stitched_coadd.psf
501 psf_bbox = geom.Box2I(
502 geom.Point2I(-(self.psf_size_x // 2), -(self.psf_size_y // 2)),
503 geom.Extent2I(self.psf_size_x, self.psf_size_y),
504 )
506 for position, cell_index in self.test_positions:
507 image1 = stitched_psf.computeKernelImage(position)
508 image2 = SingleGaussianPsf(
509 self.psf_size_x, self.psf_size_y, self.psf_sigmas[cell_index]
510 ).computeKernelImage(position)
511 self.assertImagesEqual(image1, image2)
512 self.assertEqual(image1.getBBox(), psf_bbox)
514 def test_computeImage(self):
515 """Test the computeImage method for a StitchedPsf object."""
516 stitched_psf = self.stitched_coadd.psf
517 psf_extent = geom.Extent2I(self.psf_size_x, self.psf_size_y)
519 for position, cell_index in self.test_positions:
520 image1 = stitched_psf.computeImage(position)
521 image2 = SingleGaussianPsf(
522 self.psf_size_x, self.psf_size_y, self.psf_sigmas[cell_index]
523 ).computeImage(position)
524 self.assertImagesEqual(image1, image2)
525 self.assertEqual(image1.getBBox().getDimensions(), psf_extent)
527 def test_computeImage_computeKernelImage(self):
528 """Test that computeImage called at integer points gives the same
529 result as calling computeKernelImage.
530 """
531 stitched_psf = self.stitched_coadd.psf
532 for position, _cell_index in self.test_positions:
533 pos = geom.Point2D(geom.Point2I(position)) # round to integer
534 image1 = stitched_psf.computeKernelImage(pos)
535 image2 = stitched_psf.computeImage(pos)
536 self.assertImagesEqual(image1, image2)
538 def test_computeApetureFlux(self):
539 """Test the computeApertureFlux method for a StitchedPsf object."""
540 stitched_psf = self.stitched_coadd.psf
541 for position, cell_index in self.test_positions:
542 flux1sigma = stitched_psf.computeApertureFlux(self.psf_sigmas[cell_index], position=position)
543 self.assertAlmostEqual(flux1sigma, 0.39, delta=5e-2)
545 flux3sigma = stitched_psf.computeApertureFlux(
546 3.0 * self.psf_sigmas[cell_index], position=position
547 )
548 self.assertAlmostEqual(flux3sigma, 0.97, delta=2e-2)
550 def test_asExposure(self):
551 """Test the asExposure method for a StitchedCoadd object."""
552 exposure = self.stitched_coadd.asExposure()
554 # Check that the bounding box is correct.
555 bbox = exposure.getBBox()
556 self.assertEqual(bbox.getWidth(), self.inner_size_x * self.nx + 2 * self.border_size)
557 self.assertEqual(bbox.getHeight(), self.inner_size_y * self.ny + 2 * self.border_size)
559 for y in range(self.ny):
560 for x in range(self.nx):
561 bbox = geom.Box2I(
562 geom.Point2I(self.x0 + x * self.inner_size_x, self.y0 + y * self.inner_size_y),
563 geom.Extent2I(self.inner_size_x, self.inner_size_y),
564 )
565 index = Index2D(x=x, y=y)
566 self.assertMaskedImagesEqual(exposure[bbox], self.exposures[index][bbox])
568 self.assertMaskedImagesEqual(self.stitched_coadd.asExposure(noise_index=None), exposure)
569 for noise_index in range(self.n_noise_realizations):
570 noise_exposure = self.stitched_coadd.asExposure(noise_index=noise_index)
572 self.assertImagesEqual(noise_exposure.variance, exposure.variance)
573 self.assertImagesEqual(noise_exposure.mask, exposure.mask)
575 for y in range(self.ny):
576 for x in range(self.nx):
577 bbox = geom.Box2I(
578 geom.Point2I(self.x0 + x * self.inner_size_x, self.y0 + y * self.inner_size_y),
579 geom.Extent2I(self.inner_size_x, self.inner_size_y),
580 )
581 index = Index2D(x=x, y=y)
582 self.assertImagesEqual(
583 noise_exposure.image[bbox],
584 self.multiple_cell_coadd.cells[index].outer.noise_realizations[noise_index][bbox],
585 )
587 with self.assertRaises(ValueError):
588 self.stitched_coadd.asExposure(noise_index=self.n_noise_realizations)
590 def test_aperture_correction(self):
591 """Test the aperture correction values are what we expect."""
592 ap_corr_map = self.stitched_coadd.ap_corr_map
593 algorithm_names = ap_corr_map.keys()
595 for (position, cellId), algorithm_name in product(self.test_positions, algorithm_names):
596 with self.subTest(x=cellId.x, y=cellId.y, algorithm_name=algorithm_name):
597 field_name = algorithm_name
598 self.assertEqual(
599 ap_corr_map[field_name].evaluate(position),
600 self.multiple_cell_coadd.cells[cellId].aperture_correction_map[field_name],
601 )
603 def test_inputs(self):
604 """Test that the inputs are populated correctly on stitching."""
605 inputs = self.stitched_coadd.ccds
606 for position, cellId in self.test_positions:
607 with self.subTest(x=cellId.x, y=cellId.y):
608 self.assertEqual(
609 inputs.subsetContaining(position),
610 tuple(self.multiple_cell_coadd.cells[cellId].inputs.keys()),
611 )
612 self.assertEqual(len(self.stitched_coadd.visits), 1)
614 def test_coaddInputs(self):
615 """Test that the inputs are populated when converted to Exposure."""
616 inputs = self.stitched_coadd.asExposure().getInfo().getCoaddInputs()
617 for position, _ in self.test_positions:
618 ccds = inputs.subset_containing_ccds(position, None)
619 visits = inputs.subset_containing_visits(position, None)
620 with self.subTest(x=position.x, y=position.y):
621 self.assertEqual(len(ccds), 1)
622 self.assertEqual(len(visits), 1)
624 def test_borders(self):
625 """Test that the borders are populated correctly on stitching."""
626 mi = self.stitched_coadd.asMaskedImage()
628 # Check that the borders are not all zero.
629 with self.assertRaises(AssertionError):
630 np.testing.assert_array_equal(mi.image.array[: self.border_size, :], 0)
631 with self.assertRaises(AssertionError):
632 np.testing.assert_array_equal(mi.image.array[-self.border_size :, :], 0)
633 with self.assertRaises(AssertionError):
634 np.testing.assert_array_equal(mi.image.array[:, : self.border_size], 0)
635 with self.assertRaises(AssertionError):
636 np.testing.assert_array_equal(mi.image.array[:, -self.border_size :], 0)
638 def test_fits(self):
639 """Test that we can write an Exposure with StitchedPsf to a FITS file
640 and read it.
641 """
642 write_exposure = self.stitched_coadd.asExposure()
643 with lsst.utils.tests.getTempFilePath(".fits") as filename:
644 write_exposure.writeFits(filename)
645 read_exposure = ExposureF.readFits(filename) # Test the readFits method.
647 # Test that the image planes are identical.
648 self.assertMaskedImagesEqual(read_exposure, write_exposure)
650 # Test the PSF images in the StitchedPsf.
651 for index in write_exposure.psf.images.indices():
652 self.assertImagesEqual(read_exposure.psf.images[index], write_exposure.psf.images[index])
654 # Test that the WCSs are equal.
655 self.assertEqual(
656 read_exposure.wcs.getFitsMetadata().toString(),
657 write_exposure.wcs.getFitsMetadata().toString(),
658 )
661class TestMemory(lsst.utils.tests.MemoryTestCase):
662 """Check for resource/memory leaks."""
665def setup_module(module): # noqa: D103
666 lsst.utils.tests.init()
669if __name__ == "__main__": 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true
670 lsst.utils.tests.init()
671 unittest.main()