Coverage for tests/test_trailedSources.py: 22%

253 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-29 01:37 -0700

1# 

2# This file is part of meas_extensions_trailedSources. 

3# 

4# Developed for the LSST Data Management System. 

5# This product includes software developed by the LSST Project 

6# (http://www.lsst.org). 

7# See the COPYRIGHT file at the top-level directory of this distribution 

8# for details of code ownership. 

9# 

10# This program is free software: you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation, either version 3 of the License, or 

13# (at your option) any later version. 

14# 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19# 

20# You should have received a copy of the GNU General Public License 

21# along with this program. If not, see <http://www.gnu.org/licenses/>. 

22# 

23 

24import numpy as np 

25import unittest 

26import lsst.utils.tests 

27import lsst.meas.extensions.trailedSources 

28from scipy.optimize import check_grad 

29import lsst.afw.table as afwTable 

30from lsst.meas.base.tests import AlgorithmTestCase 

31from lsst.meas.extensions.trailedSources import SingleFrameNaiveTrailPlugin 

32from lsst.meas.extensions.trailedSources import VeresModel 

33from lsst.meas.extensions.trailedSources.utils import getMeasurementCutout 

34from lsst.utils.tests import classParameters 

35try: 

36 import lsst.meas.extensions.shapeHSM # noqa: F401 

37 HAS_HSM = True 

38except ImportError: 

39 HAS_HSM = False 

40 

41 

42# Trailed-source length, angle, and centroid. 

43rng = np.random.default_rng(432) 

44nTrails = 50 

45Ls = rng.uniform(2, 20, nTrails) 

46thetas = rng.uniform(0, 2*np.pi, nTrails) 

47xcs = rng.uniform(0, 100, nTrails) 

48ycs = rng.uniform(0, 100, nTrails) 

49 

50 

51class TrailedSource: 

52 """Holds a set of true trail parameters. 

53 """ 

54 

55 def __init__(self, instFlux, length, angle, xc, yc): 

56 self.instFlux = instFlux 

57 self.length = length 

58 self.angle = angle 

59 self.center = lsst.geom.Point2D(xc, yc) 

60 self.x0 = xc - length/2 * np.cos(angle) 

61 self.y0 = yc - length/2 * np.sin(angle) 

62 self.x1 = xc + length/2 * np.cos(angle) 

63 self.y1 = yc + length/2 * np.sin(angle) 

64 

65 

66# "Extend" meas.base.tests.TestDataset 

67class TrailedTestDataset(lsst.meas.base.tests.TestDataset): 

68 """A dataset for testing trailed source measurements. 

69 Given a `TrailedSource`, construct a record of the true values and an 

70 Exposure. 

71 """ 

72 

73 def __init__(self, bbox, threshold=10.0, exposure=None, **kwds): 

74 super().__init__(bbox, threshold, exposure, **kwds) 

75 

76 def addTrailedSource(self, trail): 

77 """Add a trailed source to the simulation. 

78 'Re-implemented' version of 

79 `lsst.meas.base.tests.TestDataset.addSource`. Numerically integrates a 

80 Gaussian PSF over a line to obtain am image of a trailed source. 

81 """ 

82 

83 record = self.catalog.addNew() 

84 record.set(self.keys["centroid"], trail.center) 

85 rng = np.random.default_rng(32) 

86 covariance = rng.normal(0, 0.1, 4).reshape(2, 2) 

87 covariance[0, 1] = covariance[1, 0] 

88 record.set(self.keys["centroid_sigma"], covariance.astype(np.float32)) 

89 record.set(self.keys["shape"], self.psfShape) 

90 record.set(self.keys["isStar"], False) 

91 

92 # Sum the psf at each 

93 numIter = int(10*trail.length) 

94 xp = np.linspace(trail.x0, trail.x1, num=numIter) 

95 yp = np.linspace(trail.y0, trail.y1, num=numIter) 

96 for (x, y) in zip(xp, yp): 

97 pt = lsst.geom.Point2D(x, y) 

98 im = self.drawGaussian(self.exposure.getBBox(), trail.instFlux, 

99 lsst.afw.geom.Ellipse(self.psfShape, pt)) 

100 self.exposure.getMaskedImage().getImage().getArray()[:, :] += im.getArray() 

101 

102 totFlux = self.exposure.image.array.sum() 

103 self.exposure.image.array /= totFlux 

104 self.exposure.image.array *= trail.instFlux 

105 

106 record.set(self.keys["instFlux"], trail.instFlux) 

107 self._installFootprint(record, self.exposure.getImage()) 

108 

109 return record, self.exposure.getImage() 

110 

111 

112@unittest.skipUnless(HAS_HSM, "lsst.meas.extensions.shapeHSM not available") 

113class TrailedSourcesFailuresTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase): 

114 """Tests of various failure modes of the trailed source plugin. 

115 """ 

116 def setUp(self): 

117 self.center = lsst.geom.Point2D(50.1, 49.8) 

118 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30), 

119 lsst.geom.Extent2I(140, 160)) 

120 self.dataset = TrailedTestDataset(self.bbox) 

121 self.trail = TrailedSource(100000.0, 20, .5, 40, 40) 

122 self.dataset.addTrailedSource(self.trail) 

123 # So that the tests are consistent about output field names. 

124 self.name = "trailedSource" 

125 

126 def _setupPlugin(self, plugins=None): 

127 """Return a prepared trailed source plugin, catalog, and exposure for 

128 use with mocked failures. Runs both base_SdssShape and 

129 ext_shapeHSM_HsmSourceMoments, with HSM as the shape slot so the 

130 NaivePlugin's SDSS-to-HSM fallback path can be exercised. 

131 """ 

132 hsmPlugin = "ext_shapeHSM_HsmSourceMoments" 

133 schema = TrailedTestDataset.makeMinimalSchema() 

134 dependencies = ["base_SdssCentroid", "base_SdssShape"] 

135 dependencies += plugins if plugins is not None else [] 

136 config = self.makeSingleFrameMeasurementConfig(plugin=hsmPlugin, 

137 dependencies=dependencies) 

138 config.slots.shape = hsmPlugin 

139 config.slots.centroid = "base_SdssCentroid" 

140 trailedPlugin = SingleFrameNaiveTrailPlugin(SingleFrameNaiveTrailPlugin.ConfigClass(), 

141 self.name, 

142 schema, 

143 None) 

144 task = self.makeSingleFrameMeasurementTask( 

145 plugin=hsmPlugin, 

146 dependencies=dependencies, 

147 config=config, 

148 schema=schema 

149 ) 

150 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0) 

151 task.run(catalog, exposure) 

152 return trailedPlugin, catalog, exposure 

153 

154 def testShapeFlag(self): 

155 """Test that the correct trailed source flags get set if both shape 

156 measurements fail. 

157 """ 

158 trailedPlugin, catalog, exposure = self._setupPlugin() 

159 catalog["base_SdssShape_flag"] = True 

160 catalog["ext_shapeHSM_HsmSourceMoments_flag"] = True 

161 trailedPlugin.measure(catalog[0], exposure) 

162 self.assertTrue(catalog[0][f"{self.name}_flag"]) 

163 self.assertTrue(catalog[0][f"{self.name}_flag_shape"]) 

164 self.assertEqual(catalog[0][f"{self.name}_algorithmKey"], 0) 

165 

166 def testHsmShapeFallback(self): 

167 """Test that NaivePlugin falls back to the shape slot (HSM) when SDSS 

168 shape fails but HSM succeeds. 

169 """ 

170 trailedPlugin, catalog, exposure = self._setupPlugin() 

171 

172 # Simulate SDSS shape failure; HSM shape slot remains valid. 

173 catalog["base_SdssShape_flag"] = True 

174 trailedPlugin.measure(catalog[0], exposure) 

175 

176 # Plugin should succeed using HSM moments. 

177 self.assertFalse(catalog[0][f"{self.name}_flag"]) 

178 self.assertFalse(catalog[0][f"{self.name}_flag_shape"]) 

179 

180 length = catalog[0].get(f"{self.name}_length") 

181 self.assertTrue(np.isfinite(length)) 

182 self.assertGreater(length, 0) 

183 self.assertEqual(catalog[0].get(f"{self.name}_algorithmKey"), 2) 

184 

185 def testBothShapesFail(self): 

186 """Test that NaivePlugin sets failure flags when both SDSS shape and 

187 the HSM shape slot fail. 

188 """ 

189 trailedPlugin, catalog, exposure = self._setupPlugin() 

190 

191 # Simulate failure of both shape measurements. 

192 catalog["base_SdssShape_flag"] = True 

193 catalog["ext_shapeHSM_HsmSourceMoments_flag"] = True 

194 trailedPlugin.measure(catalog[0], exposure) 

195 

196 self.assertTrue(catalog[0][f"{self.name}_flag"]) 

197 self.assertTrue(catalog[0][f"{self.name}_flag_shape"]) 

198 self.assertEqual(catalog[0][f"{self.name}_algorithmKey"], 0) 

199 

200 

201# Following from meas_base/test_NaiveCentroid.py 

202# Taken from NaiveCentroidTestCase 

203@classParameters(length=Ls, theta=thetas, xc=xcs, yc=ycs) 

204class TrailedSourcesTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase): 

205 

206 def setUp(self): 

207 self.center = lsst.geom.Point2D(50.1, 49.8) 

208 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30), 

209 lsst.geom.Extent2I(140, 160)) 

210 self.dataset = TrailedTestDataset(self.bbox) 

211 

212 self.trail = TrailedSource(100000.0, self.length, self.theta, self.xc, self.yc) 

213 self.dataset.addTrailedSource(self.trail) 

214 

215 @staticmethod 

216 def transformMoments(Ixx, Iyy, Ixy): 

217 """Transform second-moments to semi-major and minor axis. 

218 """ 

219 xmy = Ixx - Iyy 

220 xpy = Ixx + Iyy 

221 xmy2 = xmy*xmy 

222 xy2 = Ixy*Ixy 

223 a2 = 0.5 * (xpy + np.sqrt(xmy2 + 4.0*xy2)) 

224 b2 = 0.5 * (xpy - np.sqrt(xmy2 + 4.0*xy2)) 

225 return a2, b2 

226 

227 @staticmethod 

228 def f_length(x): 

229 return SingleFrameNaiveTrailPlugin.findLength(*x)[0] 

230 

231 @staticmethod 

232 def g_length(x): 

233 return SingleFrameNaiveTrailPlugin.findLength(*x)[1] 

234 

235 @staticmethod 

236 def f_flux(x, model): 

237 return model.computeFluxWithGradient(x)[0] 

238 

239 @staticmethod 

240 def g_flux(x, model): 

241 return model.computeFluxWithGradient(x)[1] 

242 

243 @staticmethod 

244 def central_difference(func, x, *args, h=1e-8): 

245 result = np.zeros(len(x)) 

246 for i in range(len(x)): 

247 xp = x.copy() 

248 xp[i] += h 

249 fp = func(xp, *args) 

250 

251 xm = x.copy() 

252 xm[i] -= h 

253 fm = func(xm, *args) 

254 result[i] = (fp - fm) / (2*h) 

255 

256 return result 

257 

258 def makeTrailedSourceMeasurementTask(self, plugin=None, dependencies=(), 

259 config=None, schema=None, algMetadata=None): 

260 """Set up a measurement task for a trailed source plugin. 

261 """ 

262 

263 config = self.makeSingleFrameMeasurementConfig(plugin=plugin, 

264 dependencies=dependencies) 

265 

266 # Make sure the shape slot is base_SdssShape 

267 config.slots.shape = "base_SdssShape" 

268 return self.makeSingleFrameMeasurementTask(plugin=plugin, 

269 dependencies=dependencies, 

270 config=config, schema=schema, 

271 algMetadata=algMetadata) 

272 

273 def testNaivePlugin(self): 

274 """Test the NaivePlugin measurements. 

275 Given a `TrailedTestDataset`, run the NaivePlugin measurement and 

276 compare the measured parameters to the true values. 

277 """ 

278 

279 # Set up and run Naive measurement. 

280 task = self.makeTrailedSourceMeasurementTask( 

281 plugin="ext_trailedSources_Naive", 

282 dependencies=("base_SdssCentroid", "base_SdssShape") 

283 ) 

284 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0) 

285 task.run(catalog, exposure) 

286 record = catalog[0] 

287 

288 # Check the RA and Dec measurements 

289 wcs = exposure.getWcs() 

290 spt = wcs.pixelToSky(self.center) 

291 ra_true = spt.getRa().asDegrees() 

292 dec_true = spt.getDec().asDegrees() 

293 ra_meas = record.get("ext_trailedSources_Naive_ra") 

294 dec_meas = record.get("ext_trailedSources_Naive_dec") 

295 self.assertFloatsAlmostEqual(ra_true, ra_meas, atol=None, rtol=0.01) 

296 self.assertFloatsAlmostEqual(dec_true, dec_meas, atol=None, rtol=0.01) 

297 

298 # Check that root finder converged 

299 converged = record.get("ext_trailedSources_Naive_flag_noConverge") 

300 self.assertFalse(converged) 

301 

302 # Compare true with measured length, angle, and flux. 

303 # Accuracy is dependent on the second-moments measurements, so the 

304 # rtol values are simply rough upper bounds. 

305 length = record.get("ext_trailedSources_Naive_length") 

306 theta = record.get("ext_trailedSources_Naive_angle") 

307 flux = record.get("ext_trailedSources_Naive_flux") 

308 self.assertFloatsAlmostEqual(length, self.trail.length, atol=None, rtol=0.1) 

309 self.assertFloatsAlmostEqual(theta % np.pi, self.trail.angle % np.pi, 

310 atol=np.arctan(1/length), rtol=None) 

311 self.assertFloatsAlmostEqual(flux, self.trail.instFlux, atol=None, rtol=0.1) 

312 

313 # Test function gradients versus finite difference derivatives 

314 # Do length first 

315 Ixx, Iyy, Ixy = record.getShape().getParameterVector() 

316 a2, b2 = self.transformMoments(Ixx, Iyy, Ixy) 

317 self.assertLessEqual(check_grad(self.f_length, self.g_length, [a2, b2]), 1e-6) 

318 

319 # Now flux gradient 

320 xc = record.get("base_SdssShape_x") 

321 yc = record.get("base_SdssShape_y") 

322 params = np.array([xc, yc, 1.0, length, theta]) 

323 cutout = getMeasurementCutout(record, exposure) 

324 model = VeresModel(cutout) 

325 gradNum = self.central_difference(self.f_flux, params, model, h=9e-5) 

326 gradMax = np.max(np.abs(gradNum - self.g_flux(params, model))) 

327 self.assertLessEqual(gradMax, 1e-5) 

328 

329 # Check test setup 

330 self.assertNotEqual(length, self.trail.length) 

331 self.assertNotEqual(theta, self.trail.angle) 

332 

333 # Make sure measurement flag is False 

334 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

335 

336 # Check algorithmKey is set 

337 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1) 

338 

339 def testVeresPlugin(self): 

340 """Test the VeresPlugin measurements. 

341 Given a `TrailedTestDataset`, run the VeresPlugin measurement and 

342 compare the measured parameters to the true values. 

343 """ 

344 

345 # Set up and run Veres measurement. 

346 task = self.makeTrailedSourceMeasurementTask( 

347 plugin="ext_trailedSources_Veres", 

348 dependencies=( 

349 "base_SdssCentroid", 

350 "base_SdssShape", 

351 "ext_trailedSources_Naive") 

352 ) 

353 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0) 

354 task.run(catalog, exposure) 

355 record = catalog[0] 

356 

357 # Make sure optmizer converged 

358 converged = record.get("ext_trailedSources_Veres_flag_nonConvergence") 

359 self.assertFalse(converged) 

360 

361 # Compare measured trail length, angle, and flux to true values 

362 # These measurements should perform at least as well as NaivePlugin 

363 length = record.get("ext_trailedSources_Veres_length") 

364 theta = record.get("ext_trailedSources_Veres_angle") 

365 flux = record.get("ext_trailedSources_Veres_flux") 

366 self.assertFloatsAlmostEqual(length, self.trail.length, atol=None, rtol=0.1) 

367 self.assertFloatsAlmostEqual(theta % np.pi, self.trail.angle % np.pi, 

368 atol=np.arctan(1/length), rtol=None) 

369 self.assertFloatsAlmostEqual(flux, self.trail.instFlux, atol=None, rtol=0.1) 

370 

371 xc = record.get("ext_trailedSources_Veres_centroid_x") 

372 yc = record.get("ext_trailedSources_Veres_centroid_y") 

373 params = np.array([xc, yc, flux, length, theta]) 

374 cutout = getMeasurementCutout(record, exposure) 

375 model = VeresModel(cutout) 

376 gradNum = self.central_difference(model, params, h=1e-6) 

377 gradMax = np.max(np.abs(gradNum - model.gradient(params))) 

378 self.assertLessEqual(gradMax, 1e-5) 

379 

380 # Make sure test setup is working as expected 

381 self.assertNotEqual(length, self.trail.length) 

382 self.assertNotEqual(theta, self.trail.angle) 

383 

384 # Test that reduced chi-squared is reasonable 

385 rChiSq = record.get("ext_trailedSources_Veres_rChiSq") 

386 self.assertGreater(rChiSq, 0.8) 

387 self.assertLess(rChiSq, 1.3) 

388 

389 # Make sure measurement flag is False 

390 self.assertFalse(record.get("ext_trailedSources_Veres_flag")) 

391 

392 def testMonteCarlo(self): 

393 """Test the uncertainties in trail measurements from NaivePlugin 

394 """ 

395 # Adapted from lsst.meas.base 

396 

397 # Set up Naive measurement and dependencies. 

398 task = self.makeTrailedSourceMeasurementTask( 

399 plugin="ext_trailedSources_Naive", 

400 dependencies=("base_SdssCentroid", "base_SdssShape") 

401 ) 

402 

403 nSamples = 2000 

404 catalog = afwTable.SourceCatalog(task.schema) 

405 sample = 0 

406 seed = 0 

407 while sample < nSamples: 

408 seed += 1 

409 exp, cat = self.dataset.realize(100.0, task.schema, randomSeed=seed) 

410 rec = cat[0] 

411 task.run(cat, exp) 

412 

413 # Accuracy of this measurement is entirely dependent on shape and 

414 # centroiding. Skip when shape measurement fails. 

415 if rec['base_SdssShape_flag']: 

416 continue 

417 catalog.append(rec) 

418 sample += 1 

419 

420 catalog = catalog.copy(deep=True) 

421 nameBase = "ext_trailedSources_Naive_" 

422 

423 # Currently, the errors don't include covariances, so just make sure 

424 # we're close or at least over estimate 

425 length = catalog[nameBase+"length"] 

426 lengthErr = catalog[nameBase+"lengthErr"] 

427 lengthStd = np.nanstd(length) 

428 lengthErrMean = np.nanmean(lengthErr) 

429 diff = (lengthErrMean - lengthStd) / lengthErrMean 

430 self.assertGreater(diff, -0.1) 

431 self.assertLess(diff, 0.5) 

432 

433 angle = catalog[nameBase+"angle"] 

434 if (np.max(angle) - np.min(angle)) > np.pi/2: 

435 angle = angle % np.pi # Wrap if bimodal 

436 angleErr = catalog[nameBase+"angleErr"] 

437 angleStd = np.nanstd(angle) 

438 angleErrMean = np.nanmean(angleErr) 

439 diff = (angleErrMean - angleStd) / angleErrMean 

440 self.assertGreater(diff, -0.1) 

441 self.assertLess(diff, 0.6) 

442 

443 

444class TestMemory(lsst.utils.tests.MemoryTestCase): 

445 pass 

446 

447 

448def setup_module(module): 

449 lsst.utils.tests.init() 

450 

451 

452if __name__ == "__main__": 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 lsst.utils.tests.init() 

454 unittest.main()