Coverage for tests/test_trailedSources.py: 22%
253 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 08:17 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-27 08:17 +0000
1#
2# This file is part of meas_extensions_trailedSources.
3#
4# Developed for the LSST Data Management System.
5# This product includes software developed by the LSST Project
6# (http://www.lsst.org).
7# See the COPYRIGHT file at the top-level directory of this distribution
8# for details of code ownership.
9#
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22#
24import numpy as np
25import unittest
26import lsst.utils.tests
27import lsst.meas.extensions.trailedSources
28from scipy.optimize import check_grad
29import lsst.afw.table as afwTable
30from lsst.meas.base.tests import AlgorithmTestCase
31from lsst.meas.extensions.trailedSources import SingleFrameNaiveTrailPlugin
32from lsst.meas.extensions.trailedSources import VeresModel
33from lsst.meas.extensions.trailedSources.utils import getMeasurementCutout
34from lsst.utils.tests import classParameters
35try:
36 import lsst.meas.extensions.shapeHSM # noqa: F401
37 HAS_HSM = True
38except ImportError:
39 HAS_HSM = False
42# Trailed-source length, angle, and centroid.
43rng = np.random.default_rng(432)
44nTrails = 50
45Ls = rng.uniform(2, 20, nTrails)
46thetas = rng.uniform(0, 2*np.pi, nTrails)
47xcs = rng.uniform(0, 100, nTrails)
48ycs = rng.uniform(0, 100, nTrails)
51class TrailedSource:
52 """Holds a set of true trail parameters.
53 """
55 def __init__(self, instFlux, length, angle, xc, yc):
56 self.instFlux = instFlux
57 self.length = length
58 self.angle = angle
59 self.center = lsst.geom.Point2D(xc, yc)
60 self.x0 = xc - length/2 * np.cos(angle)
61 self.y0 = yc - length/2 * np.sin(angle)
62 self.x1 = xc + length/2 * np.cos(angle)
63 self.y1 = yc + length/2 * np.sin(angle)
66# "Extend" meas.base.tests.TestDataset
67class TrailedTestDataset(lsst.meas.base.tests.TestDataset):
68 """A dataset for testing trailed source measurements.
69 Given a `TrailedSource`, construct a record of the true values and an
70 Exposure.
71 """
73 def __init__(self, bbox, threshold=10.0, exposure=None, **kwds):
74 super().__init__(bbox, threshold, exposure, **kwds)
76 def addTrailedSource(self, trail):
77 """Add a trailed source to the simulation.
78 'Re-implemented' version of
79 `lsst.meas.base.tests.TestDataset.addSource`. Numerically integrates a
80 Gaussian PSF over a line to obtain am image of a trailed source.
81 """
83 record = self.catalog.addNew()
84 record.set(self.keys["centroid"], trail.center)
85 rng = np.random.default_rng(32)
86 covariance = rng.normal(0, 0.1, 4).reshape(2, 2)
87 covariance[0, 1] = covariance[1, 0]
88 record.set(self.keys["centroid_sigma"], covariance.astype(np.float32))
89 record.set(self.keys["shape"], self.psfShape)
90 record.set(self.keys["isStar"], False)
92 # Sum the psf at each
93 numIter = int(10*trail.length)
94 xp = np.linspace(trail.x0, trail.x1, num=numIter)
95 yp = np.linspace(trail.y0, trail.y1, num=numIter)
96 for (x, y) in zip(xp, yp):
97 pt = lsst.geom.Point2D(x, y)
98 im = self.drawGaussian(self.exposure.getBBox(), trail.instFlux,
99 lsst.afw.geom.Ellipse(self.psfShape, pt))
100 self.exposure.getMaskedImage().getImage().getArray()[:, :] += im.getArray()
102 totFlux = self.exposure.image.array.sum()
103 self.exposure.image.array /= totFlux
104 self.exposure.image.array *= trail.instFlux
106 record.set(self.keys["instFlux"], trail.instFlux)
107 self._installFootprint(record, self.exposure.getImage())
109 return record, self.exposure.getImage()
112@unittest.skipUnless(HAS_HSM, "lsst.meas.extensions.shapeHSM not available")
113class TrailedSourcesFailuresTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase):
114 """Tests of various failure modes of the trailed source plugin.
115 """
116 def setUp(self):
117 self.center = lsst.geom.Point2D(50.1, 49.8)
118 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30),
119 lsst.geom.Extent2I(140, 160))
120 self.dataset = TrailedTestDataset(self.bbox)
121 self.trail = TrailedSource(100000.0, 20, .5, 40, 40)
122 self.dataset.addTrailedSource(self.trail)
123 # So that the tests are consistent about output field names.
124 self.name = "trailedSource"
126 def _setupPlugin(self, plugins=None):
127 """Return a prepared trailed source plugin, catalog, and exposure for
128 use with mocked failures. Runs both base_SdssShape and
129 ext_shapeHSM_HsmSourceMoments, with HSM as the shape slot so the
130 NaivePlugin's SDSS-to-HSM fallback path can be exercised.
131 """
132 hsmPlugin = "ext_shapeHSM_HsmSourceMoments"
133 schema = TrailedTestDataset.makeMinimalSchema()
134 dependencies = ["base_SdssCentroid", "base_SdssShape"]
135 dependencies += plugins if plugins is not None else []
136 config = self.makeSingleFrameMeasurementConfig(plugin=hsmPlugin,
137 dependencies=dependencies)
138 config.slots.shape = hsmPlugin
139 config.slots.centroid = "base_SdssCentroid"
140 trailedPlugin = SingleFrameNaiveTrailPlugin(SingleFrameNaiveTrailPlugin.ConfigClass(),
141 self.name,
142 schema,
143 None)
144 task = self.makeSingleFrameMeasurementTask(
145 plugin=hsmPlugin,
146 dependencies=dependencies,
147 config=config,
148 schema=schema
149 )
150 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0)
151 task.run(catalog, exposure)
152 return trailedPlugin, catalog, exposure
154 def testShapeFlag(self):
155 """Test that the correct trailed source flags get set if both shape
156 measurements fail.
157 """
158 trailedPlugin, catalog, exposure = self._setupPlugin()
159 catalog["base_SdssShape_flag"] = True
160 catalog["ext_shapeHSM_HsmSourceMoments_flag"] = True
161 trailedPlugin.measure(catalog[0], exposure)
162 self.assertTrue(catalog[0][f"{self.name}_flag"])
163 self.assertTrue(catalog[0][f"{self.name}_flag_shape"])
164 self.assertEqual(catalog[0][f"{self.name}_algorithmKey"], 0)
166 def testHsmShapeFallback(self):
167 """Test that NaivePlugin falls back to the shape slot (HSM) when SDSS
168 shape fails but HSM succeeds.
169 """
170 trailedPlugin, catalog, exposure = self._setupPlugin()
172 # Simulate SDSS shape failure; HSM shape slot remains valid.
173 catalog["base_SdssShape_flag"] = True
174 trailedPlugin.measure(catalog[0], exposure)
176 # Plugin should succeed using HSM moments.
177 self.assertFalse(catalog[0][f"{self.name}_flag"])
178 self.assertFalse(catalog[0][f"{self.name}_flag_shape"])
180 length = catalog[0].get(f"{self.name}_length")
181 self.assertTrue(np.isfinite(length))
182 self.assertGreater(length, 0)
183 self.assertEqual(catalog[0].get(f"{self.name}_algorithmKey"), 2)
185 def testBothShapesFail(self):
186 """Test that NaivePlugin sets failure flags when both SDSS shape and
187 the HSM shape slot fail.
188 """
189 trailedPlugin, catalog, exposure = self._setupPlugin()
191 # Simulate failure of both shape measurements.
192 catalog["base_SdssShape_flag"] = True
193 catalog["ext_shapeHSM_HsmSourceMoments_flag"] = True
194 trailedPlugin.measure(catalog[0], exposure)
196 self.assertTrue(catalog[0][f"{self.name}_flag"])
197 self.assertTrue(catalog[0][f"{self.name}_flag_shape"])
198 self.assertEqual(catalog[0][f"{self.name}_algorithmKey"], 0)
201# Following from meas_base/test_NaiveCentroid.py
202# Taken from NaiveCentroidTestCase
203@classParameters(length=Ls, theta=thetas, xc=xcs, yc=ycs)
204class TrailedSourcesTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase):
206 def setUp(self):
207 self.center = lsst.geom.Point2D(50.1, 49.8)
208 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30),
209 lsst.geom.Extent2I(140, 160))
210 self.dataset = TrailedTestDataset(self.bbox)
212 self.trail = TrailedSource(100000.0, self.length, self.theta, self.xc, self.yc)
213 self.dataset.addTrailedSource(self.trail)
215 @staticmethod
216 def transformMoments(Ixx, Iyy, Ixy):
217 """Transform second-moments to semi-major and minor axis.
218 """
219 xmy = Ixx - Iyy
220 xpy = Ixx + Iyy
221 xmy2 = xmy*xmy
222 xy2 = Ixy*Ixy
223 a2 = 0.5 * (xpy + np.sqrt(xmy2 + 4.0*xy2))
224 b2 = 0.5 * (xpy - np.sqrt(xmy2 + 4.0*xy2))
225 return a2, b2
227 @staticmethod
228 def f_length(x):
229 return SingleFrameNaiveTrailPlugin.findLength(*x)[0]
231 @staticmethod
232 def g_length(x):
233 return SingleFrameNaiveTrailPlugin.findLength(*x)[1]
235 @staticmethod
236 def f_flux(x, model):
237 return model.computeFluxWithGradient(x)[0]
239 @staticmethod
240 def g_flux(x, model):
241 return model.computeFluxWithGradient(x)[1]
243 @staticmethod
244 def central_difference(func, x, *args, h=1e-8):
245 result = np.zeros(len(x))
246 for i in range(len(x)):
247 xp = x.copy()
248 xp[i] += h
249 fp = func(xp, *args)
251 xm = x.copy()
252 xm[i] -= h
253 fm = func(xm, *args)
254 result[i] = (fp - fm) / (2*h)
256 return result
258 def makeTrailedSourceMeasurementTask(self, plugin=None, dependencies=(),
259 config=None, schema=None, algMetadata=None):
260 """Set up a measurement task for a trailed source plugin.
261 """
263 config = self.makeSingleFrameMeasurementConfig(plugin=plugin,
264 dependencies=dependencies)
266 # Make sure the shape slot is base_SdssShape
267 config.slots.shape = "base_SdssShape"
268 return self.makeSingleFrameMeasurementTask(plugin=plugin,
269 dependencies=dependencies,
270 config=config, schema=schema,
271 algMetadata=algMetadata)
273 def testNaivePlugin(self):
274 """Test the NaivePlugin measurements.
275 Given a `TrailedTestDataset`, run the NaivePlugin measurement and
276 compare the measured parameters to the true values.
277 """
279 # Set up and run Naive measurement.
280 task = self.makeTrailedSourceMeasurementTask(
281 plugin="ext_trailedSources_Naive",
282 dependencies=("base_SdssCentroid", "base_SdssShape")
283 )
284 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0)
285 task.run(catalog, exposure)
286 record = catalog[0]
288 # Check the RA and Dec measurements
289 wcs = exposure.getWcs()
290 spt = wcs.pixelToSky(self.center)
291 ra_true = spt.getRa().asDegrees()
292 dec_true = spt.getDec().asDegrees()
293 ra_meas = record.get("ext_trailedSources_Naive_ra")
294 dec_meas = record.get("ext_trailedSources_Naive_dec")
295 self.assertFloatsAlmostEqual(ra_true, ra_meas, atol=None, rtol=0.01)
296 self.assertFloatsAlmostEqual(dec_true, dec_meas, atol=None, rtol=0.01)
298 # Check that root finder converged
299 converged = record.get("ext_trailedSources_Naive_flag_noConverge")
300 self.assertFalse(converged)
302 # Compare true with measured length, angle, and flux.
303 # Accuracy is dependent on the second-moments measurements, so the
304 # rtol values are simply rough upper bounds.
305 length = record.get("ext_trailedSources_Naive_length")
306 theta = record.get("ext_trailedSources_Naive_angle")
307 flux = record.get("ext_trailedSources_Naive_flux")
308 self.assertFloatsAlmostEqual(length, self.trail.length, atol=None, rtol=0.1)
309 self.assertFloatsAlmostEqual(theta % np.pi, self.trail.angle % np.pi,
310 atol=np.arctan(1/length), rtol=None)
311 self.assertFloatsAlmostEqual(flux, self.trail.instFlux, atol=None, rtol=0.1)
313 # Test function gradients versus finite difference derivatives
314 # Do length first
315 Ixx, Iyy, Ixy = record.getShape().getParameterVector()
316 a2, b2 = self.transformMoments(Ixx, Iyy, Ixy)
317 self.assertLessEqual(check_grad(self.f_length, self.g_length, [a2, b2]), 1e-6)
319 # Now flux gradient
320 xc = record.get("base_SdssShape_x")
321 yc = record.get("base_SdssShape_y")
322 params = np.array([xc, yc, 1.0, length, theta])
323 cutout = getMeasurementCutout(record, exposure)
324 model = VeresModel(cutout)
325 gradNum = self.central_difference(self.f_flux, params, model, h=9e-5)
326 gradMax = np.max(np.abs(gradNum - self.g_flux(params, model)))
327 self.assertLessEqual(gradMax, 1e-5)
329 # Check test setup
330 self.assertNotEqual(length, self.trail.length)
331 self.assertNotEqual(theta, self.trail.angle)
333 # Make sure measurement flag is False
334 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
336 # Check algorithmKey is set
337 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1)
339 def testVeresPlugin(self):
340 """Test the VeresPlugin measurements.
341 Given a `TrailedTestDataset`, run the VeresPlugin measurement and
342 compare the measured parameters to the true values.
343 """
345 # Set up and run Veres measurement.
346 task = self.makeTrailedSourceMeasurementTask(
347 plugin="ext_trailedSources_Veres",
348 dependencies=(
349 "base_SdssCentroid",
350 "base_SdssShape",
351 "ext_trailedSources_Naive")
352 )
353 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0)
354 task.run(catalog, exposure)
355 record = catalog[0]
357 # Make sure optmizer converged
358 converged = record.get("ext_trailedSources_Veres_flag_nonConvergence")
359 self.assertFalse(converged)
361 # Compare measured trail length, angle, and flux to true values
362 # These measurements should perform at least as well as NaivePlugin
363 length = record.get("ext_trailedSources_Veres_length")
364 theta = record.get("ext_trailedSources_Veres_angle")
365 flux = record.get("ext_trailedSources_Veres_flux")
366 self.assertFloatsAlmostEqual(length, self.trail.length, atol=None, rtol=0.1)
367 self.assertFloatsAlmostEqual(theta % np.pi, self.trail.angle % np.pi,
368 atol=np.arctan(1/length), rtol=None)
369 self.assertFloatsAlmostEqual(flux, self.trail.instFlux, atol=None, rtol=0.1)
371 xc = record.get("ext_trailedSources_Veres_centroid_x")
372 yc = record.get("ext_trailedSources_Veres_centroid_y")
373 params = np.array([xc, yc, flux, length, theta])
374 cutout = getMeasurementCutout(record, exposure)
375 model = VeresModel(cutout)
376 gradNum = self.central_difference(model, params, h=1e-6)
377 gradMax = np.max(np.abs(gradNum - model.gradient(params)))
378 self.assertLessEqual(gradMax, 1e-5)
380 # Make sure test setup is working as expected
381 self.assertNotEqual(length, self.trail.length)
382 self.assertNotEqual(theta, self.trail.angle)
384 # Test that reduced chi-squared is reasonable
385 rChiSq = record.get("ext_trailedSources_Veres_rChiSq")
386 self.assertGreater(rChiSq, 0.8)
387 self.assertLess(rChiSq, 1.3)
389 # Make sure measurement flag is False
390 self.assertFalse(record.get("ext_trailedSources_Veres_flag"))
392 def testMonteCarlo(self):
393 """Test the uncertainties in trail measurements from NaivePlugin
394 """
395 # Adapted from lsst.meas.base
397 # Set up Naive measurement and dependencies.
398 task = self.makeTrailedSourceMeasurementTask(
399 plugin="ext_trailedSources_Naive",
400 dependencies=("base_SdssCentroid", "base_SdssShape")
401 )
403 nSamples = 2000
404 catalog = afwTable.SourceCatalog(task.schema)
405 sample = 0
406 seed = 0
407 while sample < nSamples:
408 seed += 1
409 exp, cat = self.dataset.realize(100.0, task.schema, randomSeed=seed)
410 rec = cat[0]
411 task.run(cat, exp)
413 # Accuracy of this measurement is entirely dependent on shape and
414 # centroiding. Skip when shape measurement fails.
415 if rec['base_SdssShape_flag']:
416 continue
417 catalog.append(rec)
418 sample += 1
420 catalog = catalog.copy(deep=True)
421 nameBase = "ext_trailedSources_Naive_"
423 # Currently, the errors don't include covariances, so just make sure
424 # we're close or at least over estimate
425 length = catalog[nameBase+"length"]
426 lengthErr = catalog[nameBase+"lengthErr"]
427 lengthStd = np.nanstd(length)
428 lengthErrMean = np.nanmean(lengthErr)
429 diff = (lengthErrMean - lengthStd) / lengthErrMean
430 self.assertGreater(diff, -0.1)
431 self.assertLess(diff, 0.5)
433 angle = catalog[nameBase+"angle"]
434 if (np.max(angle) - np.min(angle)) > np.pi/2:
435 angle = angle % np.pi # Wrap if bimodal
436 angleErr = catalog[nameBase+"angleErr"]
437 angleStd = np.nanstd(angle)
438 angleErrMean = np.nanmean(angleErr)
439 diff = (angleErrMean - angleStd) / angleErrMean
440 self.assertGreater(diff, -0.1)
441 self.assertLess(diff, 0.6)
444class TestMemory(lsst.utils.tests.MemoryTestCase):
445 pass
448def setup_module(module):
449 lsst.utils.tests.init()
452if __name__ == "__main__": 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 lsst.utils.tests.init()
454 unittest.main()