Coverage for tests / test_pipelines.py: 14%

122 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-23 01:58 -0700

1# This file is part of ap_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import itertools 

23import tempfile 

24import unittest 

25 

26import lsst.daf.butler.tests as butlerTests 

27import lsst.pipe.base 

28from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name 

29import lsst.utils 

30import lsst.utils.tests 

31 

32from lsst.resources import ResourcePath 

33 

34 

35class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase): 

36 """Tests of the self-consistency of our pipeline definitions. 

37 """ 

38 def setUp(self): 

39 self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True) 

40 # Each pipeline file should have a subset that represents it in 

41 # higher-level pipelines. 

42 self.synonyms = {"ApPipe.yaml": "apPipe", 

43 "ApPipeWithIsrTaskLSST.yaml": "apPipe", 

44 "ApPipeWithPreconvolution.yaml": "apPipe", 

45 "ApPipeWithFakes.yaml": "apPipe", 

46 "SingleFrame.yaml": "singleFrame", 

47 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame", 

48 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr", 

49 "RunIsrForCrosstalkSources.yaml": "runOverscan", 

50 } 

51 

52 def test_graph_build(self): 

53 """Test that each pipeline definition file can be 

54 used to build a graph. 

55 """ 

56 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

57 for file in files: 

58 if "QuickTemplate" in file.path: 

59 # Our QuickTemplate definition cannot be tested here because it 

60 # depends on drp_tasks, which we cannot make a dependency here. 

61 continue 

62 if "PromptTemplate" in file.path: 

63 # Our PromptTemplate definition cannot be tested here because it 

64 # depends on drp_tasks, which we cannot make a dependency here. 

65 continue 

66 with self.subTest(file=str(file)): 

67 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

68 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

69 # If this fails, it will produce a useful error message. 

70 pipeline.to_graph() 

71 

72 def test_datasets(self): 

73 files = ResourcePath.findFileResources( 

74 [self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$" 

75 ) 

76 for file in files: 

77 if "QuickTemplate" in file.path: 

78 # Our QuickTemplate definition cannot be tested here because it 

79 # depends on drp_tasks, which we cannot make a dependency here. 

80 continue 

81 if "injection/" in file.path: 

82 # The source-injection post-processing ingredient is a partial 

83 # pipeline merged into full AP pipelines at build time; 

84 # it is validated separately by test_injection_ingredient. 

85 continue 

86 with self.subTest(file=str(file)): 

87 expected_inputs = { 

88 # ISR 

89 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc", 

90 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer", 

91 "opticsTransmission", "filterTransmission", "atmosphereTransmission", 

92 "illumMaskedImage", "deferredChargeCalib", 

93 # ISR-LSST 

94 "bfk", "cti", "dnlLUT", "gain_correction", 

95 # Everything else 

96 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110", 

97 "template_coadd", "pretrainedModelPackage", "dia_source_apdb" 

98 } 

99 # Detect source-injection pipelines by task label rather than 

100 # relying on filename conventions. 

101 temp_pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

102 temp_pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

103 if "injectVisit" in temp_pipeline.task_labels: 

104 expected_inputs.add("injection_catalog") 

105 tester = PipelineStepTester( 

106 filename=file, 

107 step_suffixes=[""], # Test full pipeline 

108 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False), 

109 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False), 

110 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False), 

111 ], 

112 expected_inputs=expected_inputs, 

113 # Pipeline outputs highly in flux, don't test 

114 expected_outputs=set(), 

115 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml", 

116 }, 

117 ) 

118 # Tester modifies Butler registry, so need a fresh repo every time 

119 with tempfile.TemporaryDirectory() as tempRepo: 

120 butler = butlerTests.makeTestRepo(tempRepo) 

121 tester.run(butler, self) 

122 

123 def test_whole_subset(self): 

124 """Test that each pipeline's synonymous subset includes all tasks, 

125 including those imported from other files. 

126 """ 

127 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

128 for file in files: 

129 if "QuickTemplate" in file.path: 

130 # Our QuickTemplate definition cannot be tested here because it 

131 # depends on drp_tasks, which we cannot make a dependency here. 

132 continue 

133 elif "injection/" in file.path: 

134 # PostInjectedTasksApPipe is not actually an AP pipeline 

135 continue 

136 elif "ApdbDeduplication" in file.path: 

137 # The task to export catalogs from the APDB and re-run 

138 # association is not intended to be part of Prompt Processing 

139 # or batch AP pipeline runs. 

140 continue 

141 elif "PromptTemplate" in file.path: 

142 # Our PromptTemplate definition cannot be tested here because it 

143 # depends on drp_tasks, which we cannot make a dependency here. 

144 continue 

145 with self.subTest(file=str(file)): 

146 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

147 subset = self.synonyms.get(file.basename(), "<unknown_synonym>") 

148 self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels), 

149 msg=f"These tasks are missing from subset '{subset}'") 

150 

151 def test_ap_pipe_subsets(self): 

152 """Test the unique subsets of ApPipe. 

153 """ 

154 files = ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$") 

155 required_subsets = {"preload", "prompt", "afterburner"} 

156 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a 

157 # very deliberate exception; see RFC-997. 

158 no_subset_wanted = {"getRegionTimeFromVisit"} 

159 

160 for file in files: 

161 if "injection/" in file.path: 

162 # PostInjectedTasksApPipe is not actually an AP pipeline 

163 continue 

164 with self.subTest(file=str(file)): 

165 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

166 # Do all steps exist? 

167 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets, 

168 msg="An AP pipeline is missing subsets " 

169 f"{required_subsets - pipeline.subsets.keys()}.") 

170 # Is each task part of exactly one step? 

171 for set1, set2 in itertools.product(required_subsets, required_subsets): 

172 if set1 == set2: 

173 continue 

174 tasks1 = pipeline.subsets[set1] 

175 tasks2 = pipeline.subsets[set2] 

176 self.assertTrue(tasks1.isdisjoint(tasks2), 

177 msg=f"Subsets '{set1}' and '{set2}' share tasks " 

178 f"{tasks1.intersection(tasks2)}.") 

179 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets]) 

180 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted, 

181 msg=f"These tasks are not in any of the subsets {required_subsets}.") 

182 

183 def test_injection_ingredient(self): 

184 """Test the source-injection post-processing ingredient pipeline. 

185 

186 PostInjectedTasksApPipe is a partial pipeline merged into full AP 

187 pipelines at build time by make_injection_pipeline. This test 

188 validates that it can build a graph and contains the expected tasks. 

189 """ 

190 ingredient = self.path.join("_ingredients").join("injection").join("PostInjectedTasksApPipe.yaml") 

191 with self.subTest(file=str(ingredient)): 

192 pipeline = lsst.pipe.base.Pipeline.from_uri(ingredient) 

193 expected_tasks = { 

194 "injectedMatchDiaSrc", 

195 "injectedMatchAssocDiaSrc", 

196 "consolidateMatchDiaSrc", 

197 "consolidateMatchAssocDiaSrc", 

198 } 

199 self.assertGreaterEqual( 

200 set(pipeline.task_labels), 

201 expected_tasks, 

202 msg="Source-injection post-processing ingredient is missing expected tasks.", 

203 ) 

204 

205 def test_generated_pipeline_readiness(self): 

206 """Test that the generated ApPipeWithFakes ingredient exists and is valid. 

207 

208 pipelines/_ingredients/ApPipeWithFakes.yaml is generated at build time 

209 by make_injection_pipeline (invoked via scons). This test verifies that 

210 generation occurred before pipeline tests run, and that the generated 

211 pipeline includes the source-injection task. If this test is skipped, 

212 run 'scons' in the ap_pipe root directory first. 

213 """ 

214 generated = self.path.join("_ingredients/ApPipeWithFakes.yaml") 

215 if not generated.exists(): 

216 # fail the test with a message that explains how to fix the problem, 

217 # rather than silently skipping it 

218 self.fail( 

219 f"{generated} has not been generated yet. " 

220 "Run 'scons' in the ap_pipe root directory to generate it." 

221 ) 

222 with self.subTest(file=str(generated)): 

223 pipeline = lsst.pipe.base.Pipeline.from_uri(generated) 

224 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

225 self.assertIn( 

226 "injectVisit", 

227 pipeline.task_labels, 

228 msg="Generated ApPipeWithFakes.yaml is missing the 'injectVisit' task.", 

229 ) 

230 

231 def test_preconvolution_isr_matches_ap_pipe(self): 

232 """Test that, for each instrument, ApPipeWithPreconvolution defines 

233 the same isr task (class and config) as the corresponding ApPipe. 

234 

235 Preconvolution changes only image subtraction and DIA-source 

236 detection; instrument signature removal must be unaffected. 

237 """ 

238 files = [ 

239 f for f in ResourcePath.findFileResources( 

240 [self.path], file_filter=r"^ApPipeWithPreconvolution\.yaml$" 

241 ) 

242 if "_ingredients" not in f.path 

243 ] 

244 # Sanity-check that this test actually has cameras to compare. 

245 self.assertGreater(len(files), 0, 

246 msg="No camera-specific ApPipeWithPreconvolution.yaml files found.") 

247 

248 for precon_file in files: 

249 with self.subTest(file=str(precon_file)): 

250 base_file = precon_file.dirname().join("ApPipe.yaml") 

251 self.assertTrue(base_file.exists(), 

252 msg=f"Expected sibling ApPipe.yaml next to {precon_file}: " 

253 f"{base_file} does not exist.") 

254 

255 precon = lsst.pipe.base.Pipeline.from_uri(precon_file) 

256 base = lsst.pipe.base.Pipeline.from_uri(base_file) 

257 # apdb_config has no default and must be set before to_graph(). 

258 precon.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

259 base.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

260 

261 precon_isr = precon.to_graph().tasks["isr"] 

262 base_isr = base.to_graph().tasks["isr"] 

263 

264 self.assertEqual(precon_isr.task_class_name, base_isr.task_class_name, 

265 msg=f"isr task class differs between ApPipe.yaml and " 

266 f"ApPipeWithPreconvolution.yaml in {precon_file.dirname()}.") 

267 # Can't just do `assertEqual(precon_isr, base_isr)` since 

268 # Task nodes are intentionally not equality comparable. 

269 self.assertTrue( 

270 base_isr.config.compare(precon_isr.config, shortcut=False), 

271 msg=f"isr task config differs between ApPipe.yaml and " 

272 f"ApPipeWithPreconvolution.yaml in {precon_file.dirname()}." 

273 ) 

274 

275 def test_inherited_subsets(self): 

276 """Test that instrument-specific pipelines have all the subsets of their 

277 generic counterparts. 

278 

279 Note that this does not check inheritance *within* `_ingredients`! 

280 """ 

281 files = [ 

282 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

283 if "_ingredients" not in f.path 

284 ] 

285 for file in files: 

286 if "QuickTemplate" in file.path: 

287 # Our QuickTemplate definition cannot be tested here because it 

288 # depends on drp_tasks, which we cannot make a dependency here. 

289 continue 

290 with self.subTest(file=str(file)): 

291 generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename()) 

292 if not generic.exists(): 

293 continue 

294 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys() 

295 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys() 

296 self.assertGreaterEqual(special_subsets, generic_subsets, 

297 msg="The instrument-specific pipeline is missing subsets " 

298 f"{generic_subsets - special_subsets}.") 

299 

300 

301class MemoryTester(lsst.utils.tests.MemoryTestCase): 

302 pass 

303 

304 

305def setup_module(module): 

306 lsst.utils.tests.init() 

307 

308 

309if __name__ == "__main__": 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 lsst.utils.tests.init() 

311 unittest.main()