Coverage for tests / test_trailedEdgeSources.py: 16%
193 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 01:19 -0700
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-23 01:19 -0700
1#
2# This file is part of meas_extensions_trailedSources.
3#
4# Developed for the LSST Data Management System.
5# This product includes software developed by the LSST Project
6# (http://www.lsst.org).
7# See the COPYRIGHT file at the top-level directory of this distribution
8# for details of code ownership.
9#
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22#
24import numpy as np
25import unittest
26import lsst.utils.tests
27import lsst.meas.extensions.trailedSources
28from lsst.meas.base.tests import AlgorithmTestCase
29from lsst.utils.tests import classParameters
30from lsst.geom import Point2I, Point2D, Box2I, Extent2I
31from unittest.mock import patch
33# Trailed-source length, angle, and centroid coordinates.
34trail_lengths = np.array([5, 5, 10, 4])
35trail_angles = np.array([100, 0, 5, 4])
36trail_x_coords = np.array([100, 20, -20, 90])
37trail_y_coords = np.array([100, 20, -30, 100])
40class TrailedEdgeSource:
41 """Holds a set of true trail parameters.
42 """
44 def __init__(self, instFlux, length, angle, xc, yc):
45 self.instFlux = instFlux
46 self.length = length
47 self.angle = angle
48 self.center = Point2D(xc, yc)
49 self.x0 = xc - length / 2 * np.cos(angle)
50 self.y0 = yc - length / 2 * np.sin(angle)
51 self.x1 = xc + length / 2 * np.cos(angle)
52 self.y1 = yc + length / 2 * np.sin(angle)
55class TrailedTaskSetup:
57 def makeTrailedSourceMeasurementTask(self, plugin=None, dependencies=(),
58 config=None, schema=None,
59 algMetadata=None):
60 """Set up a measurement task for a trailed source plugin.
61 """
62 config = self.makeSingleFrameMeasurementConfig(plugin=plugin,
63 dependencies=dependencies)
65 # Make sure the shape slot is base_SdssShape
66 config.slots.shape = "base_SdssShape"
67 return self.makeSingleFrameMeasurementTask(plugin=plugin,
68 dependencies=dependencies,
69 config=config,
70 schema=schema,
71 algMetadata=algMetadata)
74# "Extend" meas.base.tests.TestDataset
75class TrailedTestDataset(lsst.meas.base.tests.TestDataset):
76 """A dataset for testing trailed source measurements.
77 Given a `TrailedSource`, construct a record of the true values and an
78 Exposure.
79 """
81 def __init__(self, bbox, threshold=10.0, exposure=None, **kwds):
83 super().__init__(bbox, threshold, exposure, **kwds)
85 def addTrailedSource(self, trail, edge=True):
86 """Add a trailed source to the simulation.
88 Re-implemented version of
89 `lsst.meas.base.tests.TestDataset.addSource`. Numerically integrates a
90 Gaussian PSF over a line to obtain an image of a trailed source and
91 adds edge flags to the image.
92 """
93 record = self.catalog.addNew()
94 record.set(self.keys["centroid"], trail.center)
95 rng = np.random.default_rng(32)
96 covariance = rng.normal(0, 0.1, 4).reshape(2, 2)
97 covariance[0, 1] = covariance[1, 0]
98 record.set(self.keys["centroid_sigma"], covariance.astype(np.float32))
99 record.set(self.keys["shape"], self.psfShape)
100 record.set(self.keys["isStar"], False)
102 # Sum the psf at each
103 numIter = int(2 * trail.length)
104 xp = np.linspace(trail.x0, trail.x1, num=numIter)
105 yp = np.linspace(trail.y0, trail.y1, num=numIter)
106 for (x, y) in zip(xp, yp):
107 pt = Point2D(x, y)
108 im = self.drawGaussian(self.exposure.getBBox(), trail.instFlux,
109 lsst.afw.geom.Ellipse(self.psfShape, pt))
110 self.exposure.getMaskedImage().getImage().getArray()[:, :] += im.getArray()
112 planes = self.exposure.mask.getMaskPlaneDict()
113 dim = self.exposure.getBBox().getDimensions()
115 # Add edge flags to the first and last 20 columns and rows.
116 if edge:
117 for y in range(20):
118 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, dim[0] - 1, y)
119 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, dim[0] - 1, y + dim[1] - 20)
121 for y in range(dim[1]):
122 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, 20, y)
123 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], dim[0] - 20, dim[0] - 1, y)
125 totFlux = self.exposure.image.array.sum()
126 self.exposure.image.array /= totFlux
127 self.exposure.image.array *= trail.instFlux
129 record.set(self.keys["instFlux"], trail.instFlux)
130 self._installFootprint(record, self.exposure.getImage())
132 return record, self.exposure.getImage()
135# Following from test_trailedSources
136@classParameters(length=trail_lengths, theta=trail_angles, xc=trail_x_coords, yc=trail_y_coords)
137class TrailedEdgeSourcesTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase):
138 """ Test if ext_trailedSources_Naive_flag_edge is set correctly.
140 Given a `TrailedSource`, test if the edge flag is set correctly in the
141 source catalog after the
142 `lsst.meas.extensions.trailedSources.Naive.Plugin.makeTrailedSourceMeasurementTask`
143 has been run on the source catalog.
144 """
146 def setUp(self):
147 self.center = Point2D(50.1, 49.8)
148 self.bbox = Box2I(lsst.geom.Point2I(-20, -30), Extent2I(140, 160))
149 self.dataset = TrailedTestDataset(self.bbox)
151 # Trail which extends into edge pixels
152 self.trail = TrailedEdgeSource(100000.0, self.length, self.theta, self.xc, self.yc)
153 self.dataset.addTrailedSource(self.trail)
155 def testEdgeFlag(self):
156 """Test if edge flags are correctly set in NaivePlugin.py
158 Given a `TrailedTestDataset`, run the NaivePlugin measurement and
159 check that the trailed sources have the edge flag set. [100,100] does
160 not contain any edge pixels and should not have a flag set, [20,20]
161 crosses into the edge region on only one side and should have the edge
162 flag set, and [-20,-30] extends off the chip and should have the edge
163 flag set.
164 """
165 # Set up and run Naive measurement.
166 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self,
167 plugin="ext_trailedSources_Naive",
168 dependencies=("base_SdssCentroid",
169 "base_SdssShape")
170 )
171 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0)
172 task.run(catalog, exposure)
173 record = catalog[0]
175 # Check that x0, y0 or x1, y1 is flagged as an edge pixel
176 x1 = int(record['ext_trailedSources_Naive_x1'])
177 y1 = int(record['ext_trailedSources_Naive_y1'])
178 x0 = int(record['ext_trailedSources_Naive_x0'])
179 y0 = int(record['ext_trailedSources_Naive_y0'])
181 # Test Case with no edge pixels
182 if record['truth_x'] == 100:
183 # These are used to ensure the mask pixels the trailed sources are
184 # compared with have the correct flags set
185 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask(
186 'EDGE') != 0)
187 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask(
188 'EDGE') != 0)
190 self.assertFalse(begin_edge_pixel_set)
191 self.assertTrue(end_edge_pixel_set)
193 # Make sure measurement edge flag is set, but Naive_flag is not.
194 # A failed trailed source measurement with the edge flag
195 # set means the edge flag was set despite the measurement
196 # failing.
197 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge"))
198 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
199 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
200 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan"))
201 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1)
203 x1 = int(record['ext_trailedSources_Naive_x1'])
204 y1 = int(record['ext_trailedSources_Naive_y1'])
206 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
207 self.assertTrue(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
209 # Test case with one end of trail containing edge pixels
210 elif record['truth_x'] == 20:
211 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask(
212 'EDGE') != 0)
213 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask(
214 'EDGE') != 0)
216 self.assertFalse(begin_edge_pixel_set)
217 self.assertFalse(end_edge_pixel_set)
219 # Make sure measurement Naive_flag_edge and Naive_flag not set
220 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
221 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
222 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
223 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan"))
224 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1)
226 x1 = int(record['ext_trailedSources_Naive_x1'])
227 y1 = int(record['ext_trailedSources_Naive_y1'])
229 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
230 self.assertFalse(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
232 # Test case trail fully contained
233 elif record["truth_x"] == 90:
234 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask(
235 'EDGE') != 0)
236 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask(
237 'EDGE') != 0)
239 self.assertFalse(begin_edge_pixel_set)
240 self.assertFalse(end_edge_pixel_set)
242 # Make sure measurement Naive_flag_edge and Naive_flag not set
243 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
244 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
245 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
246 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan"))
247 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1)
249 x1 = int(record['ext_trailedSources_Naive_x1'])
250 y1 = int(record['ext_trailedSources_Naive_y1'])
252 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
253 self.assertFalse(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
255 # Test case with trailed source extending off chip.
256 else:
257 self.assertEqual(record['truth_x'], -20)
258 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge"))
259 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
260 self.assertTrue(record.get("ext_trailedSources_Naive_flag_off_image"))
261 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan"))
262 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1)
264 def testNanFlag(self):
265 """Test if nan flags are correctly set in NaivePlugin.py
267 Given a `TrailedTestDataset`, run the NaivePlugin measurement which
268 has trailed sources where one of the end point values results in a
269 nan.
270 """
271 # Set up and run Naive measurement.
272 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self,
273 plugin="ext_trailedSources_Naive",
274 dependencies=("base_SdssCentroid",
275 "base_SdssShape")
276 )
278 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0)
280 original_check_trail_function = task.plugins['ext_trailedSources_Naive'].check_trail
281 # Used to simulate a trailed source where one of the coordinates is a
282 # nan.
284 def check_trail_mock(*args, **kwargs):
285 measRecord = args[0]
286 exposure = args[1]
287 x0 = args[2]
288 y0 = args[3]
289 x1 = args[4]
290 y1 = np.nan # overriding to test NAN flagging
291 length = args[6]
292 measRecord['ext_trailedSources_Naive_y1'] = np.nan
293 return original_check_trail_function(measRecord, exposure, x0, y0, x1, y1, length)
295 # This patcher mocks check_trail so that one of the trailed sources
296 # includes it is checking contains a nan at one of its endpoints.
297 patcher = patch(
298 'lsst.meas.extensions.trailedSources.NaivePlugin.SingleFrameNaiveTrailPlugin.check_trail',
299 side_effect=check_trail_mock)
300 patcher.start()
301 task.run(catalog, exposure)
302 record = catalog[0]
304 # Test Case with no edge pixels, but one is set to nan.
305 if record['truth_x'] == 100:
306 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
307 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
308 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
309 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan"))
311 # Test case with one end of trail containing edge pixels, but nan is
312 # set so edge does not end up set.
313 elif record['truth_x'] == 20:
315 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
316 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
317 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
318 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan"))
320 # Test case trail fully contained, but contains one nan. Only nan flag
321 # is set.
322 elif record["truth_x"] == 90:
324 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
325 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
326 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
327 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan"))
329 # Test case with trailed source extending off chip. One coordinate
330 # is off image the other is nan, so edge, off_image, and nan should
331 # be set.
332 else:
333 self.assertEqual(record['truth_x'], -20)
334 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge"))
335 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
336 self.assertTrue(record.get("ext_trailedSources_Naive_flag_off_image"))
337 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan"))
339 patcher.stop()
342@classParameters(length=[10], theta=[5], xc=[-20], yc=[-30])
343class TrailedEdgeSourcesOffImageTest(AlgorithmTestCase, lsst.utils.tests.TestCase):
344 """ Test if ext_trailedSources_Naive_flag_edge is set correctly.
346 Given a `TrailedSource`, test if the edge flag is set correctly in the
347 source catalog after the
348 'lsst.meas.extensions.trailedSources.Naive.Plugin.makeTrailedSourceMeasurementTask'
349 has been run on the source catalog.
350 """
352 def setUp(self):
353 self.center = Point2D(50.1, 49.8)
354 self.bbox = Box2I(lsst.geom.Point2I(-20, -30), Extent2I(140, 160))
355 self.dataset = TrailedTestDataset(self.bbox)
357 # Trail which extends into edge pixels
358 self.trail = TrailedEdgeSource(100000.0, self.length, self.theta,
359 self.xc, self.yc)
360 self.dataset.addTrailedSource(self.trail, edge=False)
362 def tearDown(self):
363 del self.center
364 del self.bbox
365 del self.trail
366 del self.dataset
368 def testOffImageEdgeFlag(self):
369 """Test if edge flags are correctly set in NaivePlugin.py when source
370 extends off the the image.
372 Given a `TrailedTestDataset`, run the NaivePlugin measurement and
373 check that the edge flag set when a source extends off the chip.
374 Edge pixels are not set in this test.
375 """
376 # Set up and run Naive measurement.
377 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self,
378 plugin="ext_trailedSources_Naive",
379 dependencies=("base_SdssCentroid",
380 "base_SdssShape")
381 )
382 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0)
383 task.run(catalog, exposure)
384 record = catalog[0]
386 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge"))
387 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
388 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1)
391class TestMemory(lsst.utils.tests.MemoryTestCase):
392 pass
395def setup_module(module):
396 lsst.utils.tests.init()
399if __name__ == "__main__": 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true
400 lsst.utils.tests.init()
401 unittest.main()