Coverage for tests / test_trailedEdgeSources.py: 16%

193 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 08:43 +0000

1# 

2# This file is part of meas_extensions_trailedSources. 

3# 

4# Developed for the LSST Data Management System. 

5# This product includes software developed by the LSST Project 

6# (http://www.lsst.org). 

7# See the COPYRIGHT file at the top-level directory of this distribution 

8# for details of code ownership. 

9# 

10# This program is free software: you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation, either version 3 of the License, or 

13# (at your option) any later version. 

14# 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19# 

20# You should have received a copy of the GNU General Public License 

21# along with this program. If not, see <http://www.gnu.org/licenses/>. 

22# 

23 

24import numpy as np 

25import unittest 

26import lsst.utils.tests 

27import lsst.meas.extensions.trailedSources 

28from lsst.meas.base.tests import AlgorithmTestCase 

29from lsst.utils.tests import classParameters 

30from lsst.geom import Point2I, Point2D, Box2I, Extent2I 

31from unittest.mock import patch 

32 

33# Trailed-source length, angle, and centroid coordinates. 

34trail_lengths = np.array([5, 5, 10, 4]) 

35trail_angles = np.array([100, 0, 5, 4]) 

36trail_x_coords = np.array([100, 20, -20, 90]) 

37trail_y_coords = np.array([100, 20, -30, 100]) 

38 

39 

40class TrailedEdgeSource: 

41 """Holds a set of true trail parameters. 

42 """ 

43 

44 def __init__(self, instFlux, length, angle, xc, yc): 

45 self.instFlux = instFlux 

46 self.length = length 

47 self.angle = angle 

48 self.center = Point2D(xc, yc) 

49 self.x0 = xc - length / 2 * np.cos(angle) 

50 self.y0 = yc - length / 2 * np.sin(angle) 

51 self.x1 = xc + length / 2 * np.cos(angle) 

52 self.y1 = yc + length / 2 * np.sin(angle) 

53 

54 

55class TrailedTaskSetup: 

56 

57 def makeTrailedSourceMeasurementTask(self, plugin=None, dependencies=(), 

58 config=None, schema=None, 

59 algMetadata=None): 

60 """Set up a measurement task for a trailed source plugin. 

61 """ 

62 config = self.makeSingleFrameMeasurementConfig(plugin=plugin, 

63 dependencies=dependencies) 

64 

65 # Make sure the shape slot is base_SdssShape 

66 config.slots.shape = "base_SdssShape" 

67 return self.makeSingleFrameMeasurementTask(plugin=plugin, 

68 dependencies=dependencies, 

69 config=config, 

70 schema=schema, 

71 algMetadata=algMetadata) 

72 

73 

74# "Extend" meas.base.tests.TestDataset 

75class TrailedTestDataset(lsst.meas.base.tests.TestDataset): 

76 """A dataset for testing trailed source measurements. 

77 Given a `TrailedSource`, construct a record of the true values and an 

78 Exposure. 

79 """ 

80 

81 def __init__(self, bbox, threshold=10.0, exposure=None, **kwds): 

82 

83 super().__init__(bbox, threshold, exposure, **kwds) 

84 

85 def addTrailedSource(self, trail, edge=True): 

86 """Add a trailed source to the simulation. 

87 

88 Re-implemented version of 

89 `lsst.meas.base.tests.TestDataset.addSource`. Numerically integrates a 

90 Gaussian PSF over a line to obtain an image of a trailed source and 

91 adds edge flags to the image. 

92 """ 

93 record = self.catalog.addNew() 

94 record.set(self.keys["centroid"], trail.center) 

95 rng = np.random.default_rng(32) 

96 covariance = rng.normal(0, 0.1, 4).reshape(2, 2) 

97 covariance[0, 1] = covariance[1, 0] 

98 record.set(self.keys["centroid_sigma"], covariance.astype(np.float32)) 

99 record.set(self.keys["shape"], self.psfShape) 

100 record.set(self.keys["isStar"], False) 

101 

102 # Sum the psf at each 

103 numIter = int(2 * trail.length) 

104 xp = np.linspace(trail.x0, trail.x1, num=numIter) 

105 yp = np.linspace(trail.y0, trail.y1, num=numIter) 

106 for (x, y) in zip(xp, yp): 

107 pt = Point2D(x, y) 

108 im = self.drawGaussian(self.exposure.getBBox(), trail.instFlux, 

109 lsst.afw.geom.Ellipse(self.psfShape, pt)) 

110 self.exposure.getMaskedImage().getImage().getArray()[:, :] += im.getArray() 

111 

112 planes = self.exposure.mask.getMaskPlaneDict() 

113 dim = self.exposure.getBBox().getDimensions() 

114 

115 # Add edge flags to the first and last 20 columns and rows. 

116 if edge: 

117 for y in range(20): 

118 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, dim[0] - 1, y) 

119 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, dim[0] - 1, y + dim[1] - 20) 

120 

121 for y in range(dim[1]): 

122 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, 20, y) 

123 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], dim[0] - 20, dim[0] - 1, y) 

124 

125 totFlux = self.exposure.image.array.sum() 

126 self.exposure.image.array /= totFlux 

127 self.exposure.image.array *= trail.instFlux 

128 

129 record.set(self.keys["instFlux"], trail.instFlux) 

130 self._installFootprint(record, self.exposure.getImage()) 

131 

132 return record, self.exposure.getImage() 

133 

134 

135# Following from test_trailedSources 

136@classParameters(length=trail_lengths, theta=trail_angles, xc=trail_x_coords, yc=trail_y_coords) 

137class TrailedEdgeSourcesTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase): 

138 """ Test if ext_trailedSources_Naive_flag_edge is set correctly. 

139 

140 Given a `TrailedSource`, test if the edge flag is set correctly in the 

141 source catalog after the 

142 `lsst.meas.extensions.trailedSources.Naive.Plugin.makeTrailedSourceMeasurementTask` 

143 has been run on the source catalog. 

144 """ 

145 

146 def setUp(self): 

147 self.center = Point2D(50.1, 49.8) 

148 self.bbox = Box2I(lsst.geom.Point2I(-20, -30), Extent2I(140, 160)) 

149 self.dataset = TrailedTestDataset(self.bbox) 

150 

151 # Trail which extends into edge pixels 

152 self.trail = TrailedEdgeSource(100000.0, self.length, self.theta, self.xc, self.yc) 

153 self.dataset.addTrailedSource(self.trail) 

154 

155 def testEdgeFlag(self): 

156 """Test if edge flags are correctly set in NaivePlugin.py 

157 

158 Given a `TrailedTestDataset`, run the NaivePlugin measurement and 

159 check that the trailed sources have the edge flag set. [100,100] does 

160 not contain any edge pixels and should not have a flag set, [20,20] 

161 crosses into the edge region on only one side and should have the edge 

162 flag set, and [-20,-30] extends off the chip and should have the edge 

163 flag set. 

164 """ 

165 # Set up and run Naive measurement. 

166 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self, 

167 plugin="ext_trailedSources_Naive", 

168 dependencies=("base_SdssCentroid", 

169 "base_SdssShape") 

170 ) 

171 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0) 

172 task.run(catalog, exposure) 

173 record = catalog[0] 

174 

175 # Check that x0, y0 or x1, y1 is flagged as an edge pixel 

176 x1 = int(record['ext_trailedSources_Naive_x1']) 

177 y1 = int(record['ext_trailedSources_Naive_y1']) 

178 x0 = int(record['ext_trailedSources_Naive_x0']) 

179 y0 = int(record['ext_trailedSources_Naive_y0']) 

180 

181 # Test Case with no edge pixels 

182 if record['truth_x'] == 100: 

183 # These are used to ensure the mask pixels the trailed sources are 

184 # compared with have the correct flags set 

185 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask( 

186 'EDGE') != 0) 

187 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask( 

188 'EDGE') != 0) 

189 

190 self.assertFalse(begin_edge_pixel_set) 

191 self.assertTrue(end_edge_pixel_set) 

192 

193 # Make sure measurement edge flag is set, but Naive_flag is not. 

194 # A failed trailed source measurement with the edge flag 

195 # set means the edge flag was set despite the measurement 

196 # failing. 

197 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge")) 

198 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

199 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

200 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan")) 

201 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1) 

202 

203 x1 = int(record['ext_trailedSources_Naive_x1']) 

204 y1 = int(record['ext_trailedSources_Naive_y1']) 

205 

206 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

207 self.assertTrue(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

208 

209 # Test case with one end of trail containing edge pixels 

210 elif record['truth_x'] == 20: 

211 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask( 

212 'EDGE') != 0) 

213 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask( 

214 'EDGE') != 0) 

215 

216 self.assertFalse(begin_edge_pixel_set) 

217 self.assertFalse(end_edge_pixel_set) 

218 

219 # Make sure measurement Naive_flag_edge and Naive_flag not set 

220 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

221 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

222 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

223 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan")) 

224 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1) 

225 

226 x1 = int(record['ext_trailedSources_Naive_x1']) 

227 y1 = int(record['ext_trailedSources_Naive_y1']) 

228 

229 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

230 self.assertFalse(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

231 

232 # Test case trail fully contained 

233 elif record["truth_x"] == 90: 

234 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask( 

235 'EDGE') != 0) 

236 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask( 

237 'EDGE') != 0) 

238 

239 self.assertFalse(begin_edge_pixel_set) 

240 self.assertFalse(end_edge_pixel_set) 

241 

242 # Make sure measurement Naive_flag_edge and Naive_flag not set 

243 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

244 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

245 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

246 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan")) 

247 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1) 

248 

249 x1 = int(record['ext_trailedSources_Naive_x1']) 

250 y1 = int(record['ext_trailedSources_Naive_y1']) 

251 

252 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

253 self.assertFalse(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

254 

255 # Test case with trailed source extending off chip. 

256 else: 

257 self.assertEqual(record['truth_x'], -20) 

258 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge")) 

259 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

260 self.assertTrue(record.get("ext_trailedSources_Naive_flag_off_image")) 

261 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan")) 

262 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1) 

263 

264 def testNanFlag(self): 

265 """Test if nan flags are correctly set in NaivePlugin.py 

266 

267 Given a `TrailedTestDataset`, run the NaivePlugin measurement which 

268 has trailed sources where one of the end point values results in a 

269 nan. 

270 """ 

271 # Set up and run Naive measurement. 

272 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self, 

273 plugin="ext_trailedSources_Naive", 

274 dependencies=("base_SdssCentroid", 

275 "base_SdssShape") 

276 ) 

277 

278 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0) 

279 

280 original_check_trail_function = task.plugins['ext_trailedSources_Naive'].check_trail 

281 # Used to simulate a trailed source where one of the coordinates is a 

282 # nan. 

283 

284 def check_trail_mock(*args, **kwargs): 

285 measRecord = args[0] 

286 exposure = args[1] 

287 x0 = args[2] 

288 y0 = args[3] 

289 x1 = args[4] 

290 y1 = np.nan # overriding to test NAN flagging 

291 length = args[6] 

292 measRecord['ext_trailedSources_Naive_y1'] = np.nan 

293 return original_check_trail_function(measRecord, exposure, x0, y0, x1, y1, length) 

294 

295 # This patcher mocks check_trail so that one of the trailed sources 

296 # includes it is checking contains a nan at one of its endpoints. 

297 patcher = patch( 

298 'lsst.meas.extensions.trailedSources.NaivePlugin.SingleFrameNaiveTrailPlugin.check_trail', 

299 side_effect=check_trail_mock) 

300 patcher.start() 

301 task.run(catalog, exposure) 

302 record = catalog[0] 

303 

304 # Test Case with no edge pixels, but one is set to nan. 

305 if record['truth_x'] == 100: 

306 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

307 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

308 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

309 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan")) 

310 

311 # Test case with one end of trail containing edge pixels, but nan is 

312 # set so edge does not end up set. 

313 elif record['truth_x'] == 20: 

314 

315 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

316 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

317 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

318 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan")) 

319 

320 # Test case trail fully contained, but contains one nan. Only nan flag 

321 # is set. 

322 elif record["truth_x"] == 90: 

323 

324 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

325 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

326 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

327 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan")) 

328 

329 # Test case with trailed source extending off chip. One coordinate 

330 # is off image the other is nan, so edge, off_image, and nan should 

331 # be set. 

332 else: 

333 self.assertEqual(record['truth_x'], -20) 

334 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge")) 

335 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

336 self.assertTrue(record.get("ext_trailedSources_Naive_flag_off_image")) 

337 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan")) 

338 

339 patcher.stop() 

340 

341 

342@classParameters(length=[10], theta=[5], xc=[-20], yc=[-30]) 

343class TrailedEdgeSourcesOffImageTest(AlgorithmTestCase, lsst.utils.tests.TestCase): 

344 """ Test if ext_trailedSources_Naive_flag_edge is set correctly. 

345 

346 Given a `TrailedSource`, test if the edge flag is set correctly in the 

347 source catalog after the 

348 'lsst.meas.extensions.trailedSources.Naive.Plugin.makeTrailedSourceMeasurementTask' 

349 has been run on the source catalog. 

350 """ 

351 

352 def setUp(self): 

353 self.center = Point2D(50.1, 49.8) 

354 self.bbox = Box2I(lsst.geom.Point2I(-20, -30), Extent2I(140, 160)) 

355 self.dataset = TrailedTestDataset(self.bbox) 

356 

357 # Trail which extends into edge pixels 

358 self.trail = TrailedEdgeSource(100000.0, self.length, self.theta, 

359 self.xc, self.yc) 

360 self.dataset.addTrailedSource(self.trail, edge=False) 

361 

362 def tearDown(self): 

363 del self.center 

364 del self.bbox 

365 del self.trail 

366 del self.dataset 

367 

368 def testOffImageEdgeFlag(self): 

369 """Test if edge flags are correctly set in NaivePlugin.py when source 

370 extends off the the image. 

371 

372 Given a `TrailedTestDataset`, run the NaivePlugin measurement and 

373 check that the edge flag set when a source extends off the chip. 

374 Edge pixels are not set in this test. 

375 """ 

376 # Set up and run Naive measurement. 

377 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self, 

378 plugin="ext_trailedSources_Naive", 

379 dependencies=("base_SdssCentroid", 

380 "base_SdssShape") 

381 ) 

382 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0) 

383 task.run(catalog, exposure) 

384 record = catalog[0] 

385 

386 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge")) 

387 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

388 self.assertEqual(record.get("ext_trailedSources_Naive_algorithmKey"), 1) 

389 

390 

391class TestMemory(lsst.utils.tests.MemoryTestCase): 

392 pass 

393 

394 

395def setup_module(module): 

396 lsst.utils.tests.init() 

397 

398 

399if __name__ == "__main__": 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true

400 lsst.utils.tests.init() 

401 unittest.main()